从Hive源码解读大数据开发为什么可以脱离SQL、Java、Scala
前言
【本文适合有一定计算机基础/半年工作经验的读者食用。立个Flg,愿天下不再有肤浅的SQL Boy】
谈到大数据开发,占据绝大多数人口的就是SQL Boy,不接受反驳,毕竟大数据主要就是为机器学习和统计报表服务的,自然从Oracle数据库开发转过来并且还是只会写几句SQL的人不在少数,个别会Python写个spark.sql(“一个sql字符串”)的已经是SQL Boy中的人才。这种只能处理结构化表的最基础的大数据开发人员,就是我们常提到的梗:肤浅的SQL Boy。。。对大数据完全不懂,思想还停留在数据库时代,大数据组件也都是拿来当RDBMS来用。。。这种业务开发人员的技术水平其实不敢恭维。
还有从Java后端开发转过来的,虽然不适应,但还是可以一个Main方法流畅地操作Spark、Flink,手写个JDBC,做点简单的二开,这种就是平台开发人员,技术水平要更高一些。Java写得好,Scala其实上手也快。
但是。。。这并不代表做大数据只能用SQL/Java/Scala。。。这么局限的话,也不比SQL Boy强到哪里去。
笔者最早还搞过嵌入式开发,自然明白C/C#/C++也可以搞大数据。。。
本文将以大数据开发中最常见的数仓组件Hive的drop table为例,抛砖引玉,解读为神马大数据开发可以脱离SQL、Java、Scala。
为神马可以脱离SQL
数据不外乎结构化数据和非结构化数据,SQL只能处理极其有限的结构化表【RDBMS、整齐的csv/tsv等】,绝大多数的半结构化、非结构化数据SQL是无能为力的【log日志文件、音图等】。古代的MapReduce本身就不可以用SQL,Spark和Flink老版本都是基于API的,没有SQL的年代大家也活得好好的。大数据组件对SQL的支持日渐友好都是后来的事情,主要是为了降低门槛,让SQL Boy也可以用上大数据技术。
肤浅的SQL Boy们当然只知道:
droptable db_name.tb_name;
正常情况这个Hive表就会被drop掉,认知也就局限于Hive是个数据库。
但是大数据平台开发知道去翻看Hive的Java API:
https://svn.apache.org/repos/infra/websites/production/hive/content/javadocs/r3.1.3/api/index.html
知道还有这种方式:
packagecom.zhiyong;importorg.apache.hadoop.hive.conf.HiveConf;importorg.apache.hadoop.hive.metastore.HiveMetaStoreClient;/**
* @program: zhiyong_study
* @description: 测试MetaStore
* @author: zhiyong
* @create: 2023-03-22 22:57
**/publicclassMetaStoreDemo{publicstaticvoidmain(String[] args)throwsException{HiveConf hiveConf =newHiveConf();HiveMetaStoreClient client =newHiveMetaStoreClient(hiveConf);
client.dropTable("db_name","tb_name");}}
通过调用API的方式,同样可以drop掉表。显然不一定要用DDL。通过HiveMetaStoreClient的方式,还可以create建表等操作。
懂大数据底层的平台开发当然还有更狠的方式:直接连Hive存元数据的MySQL,对元数据表的数据做精准crud。。。
对结构化表的ETL或者其它的运算处理完全可以用Spark的DataFrame、Flink的DataStream编程,纯API方式实现,SQL能实现的Java和Scala都能实现,至于SQL实现不了的Java和Scala也能实现。。。
笔者实在是想不到除了RDBMS和各类包皮产品【在开源的Apache组件基础上做一些封装】,还有哪些场景是只能用SQL的。。。
至此,可以说明大数据可以脱离SQL。
为神马可以脱离Java
虽然Hive底层是Java写的,但是这并不意味着只能用Java操作Hive。认知这么肤浅的话,也就活该一辈子调参调API了。。。
找到dropTable的实际入口
从Hive3.1.2源码,可以找到dropTable方法:
@OverridepublicvoiddropTable(String dbname,String name,boolean deleteData,boolean ignoreUnknownTab)throwsMetaException,TException,NoSuchObjectException,UnsupportedOperationException{dropTable(getDefaultCatalog(conf), dbname, name, deleteData, ignoreUnknownTab,null);}@OverridepublicvoiddropTable(String dbname,String name,boolean deleteData,boolean ignoreUnknownTab,boolean ifPurge)throwsTException{dropTable(getDefaultCatalog(conf), dbname, name, deleteData, ignoreUnknownTab, ifPurge);}@OverridepublicvoiddropTable(String dbname,String name)throwsTException{dropTable(getDefaultCatalog(conf), dbname, name,true,true,null);}@OverridepublicvoiddropTable(String catName,String dbName,String tableName,boolean deleteData,boolean ignoreUnknownTable,boolean ifPurge)throwsTException{//build new environmentContext with ifPurge;EnvironmentContext envContext =null;if(ifPurge){Map<String,String> warehouseOptions;
warehouseOptions =newHashMap<>();
warehouseOptions.put("ifPurge","TRUE");
envContext =newEnvironmentContext(warehouseOptions);}dropTable(catName, dbName, tableName, deleteData, ignoreUnknownTable, envContext);}
虽然有多个同名方法,但是底层调用的还是同一个方法:
/**
* Drop the table and choose whether to: delete the underlying table data;
* throw if the table doesn't exist; save the data in the trash.
*
* @param catName catalog name
* @param dbname database name
* @param name table name
* @param deleteData
* delete the underlying data or just delete the table in metadata
* @param ignoreUnknownTab
* don't throw if the requested table doesn't exist
* @param envContext
* for communicating with thrift
* @throws MetaException
* could not drop table properly
* @throws NoSuchObjectException
* the table wasn't found
* @throws TException
* a thrift communication error occurred
* @throws UnsupportedOperationException
* dropping an index table is not allowed
* @see org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface#drop_table(java.lang.String,
* java.lang.String, boolean)
*/publicvoiddropTable(String catName,String dbname,String name,boolean deleteData,boolean ignoreUnknownTab,EnvironmentContext envContext)throwsMetaException,TException,NoSuchObjectException,UnsupportedOperationException{Table tbl;try{
tbl =getTable(catName, dbname, name);}catch(NoSuchObjectException e){if(!ignoreUnknownTab){throw e;}return;}HiveMetaHook hook =getHook(tbl);if(hook !=null){
hook.preDropTable(tbl);}boolean success =false;try{drop_table_with_environment_context(catName, dbname, name, deleteData, envContext);if(hook !=null){
hook.commitDropTable(tbl, deleteData ||(envContext !=null&&"TRUE".equals(envContext.getProperties().get("ifPurge"))));}
success=true;}catch(NoSuchObjectException e){if(!ignoreUnknownTab){throw e;}}finally{if(!success &&(hook !=null)){
hook.rollbackDropTable(tbl);}}}
主要就是获取了表对象,然后做了
preDropTable
预提交和
commitDropTable
实际的提交。这种2PC方式表面上还是很严谨。。。
可以发现
HiveMetaHook
这其实是个接口:
packageorg.apache.hadoop.hive.metastore;/**
* HiveMetaHook defines notification methods which are invoked as part
* of transactions against the metastore, allowing external catalogs
* such as HBase to be kept in sync with Hive's metastore.
*
*<p>
*
* Implementations can use {@link MetaStoreUtils#isExternalTable} to
* distinguish external tables from managed tables.
*/@InterfaceAudience.Public@InterfaceStability.StablepublicinterfaceHiveMetaHook{publicString ALTER_TABLE_OPERATION_TYPE ="alterTableOpType";publicList<String> allowedAlterTypes =ImmutableList.of("ADDPROPS","DROPPROPS");/**
* Called before a table definition is removed from the metastore
* during DROP TABLE.
*
* @param table table definition
*/publicvoidpreDropTable(Table table)throwsMetaException;/**
* Called after failure removing a table definition from the metastore
* during DROP TABLE.
*
* @param table table definition
*/publicvoidrollbackDropTable(Table table)throwsMetaException;/**
* Called after successfully removing a table definition from the metastore
* during DROP TABLE.
*
* @param table table definition
*
* @param deleteData whether to delete data as well; this should typically
* be ignored in the case of an external table
*/publicvoidcommitDropTable(Table table,boolean deleteData)throwsMetaException;}
继承关系:
显然不是这个:
packageorg.apache.hadoop.hive.metastore;publicabstractclassDefaultHiveMetaHookimplementsHiveMetaHook{/**
* Called after successfully INSERT [OVERWRITE] statement is executed.
* @param table table definition
* @param overwrite true if it is INSERT OVERWRITE
*
* @throws MetaException
*/publicabstractvoidcommitInsertTable(Table table,boolean overwrite)throwsMetaException;/**
* called before commit insert method is called
* @param table table definition
* @param overwrite true if it is INSERT OVERWRITE
*
* @throws MetaException
*/publicabstractvoidpreInsertTable(Table table,boolean overwrite)throwsMetaException;/**
* called in case pre commit or commit insert fail.
* @param table table definition
* @param overwrite true if it is INSERT OVERWRITE
*
* @throws MetaException
*/publicabstractvoidrollbackInsertTable(Table table,boolean overwrite)throwsMetaException;}
更不可能是这个test的Mock类:
/**
* Mock class used for unit testing.
* {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2#testLockingOnInsertIntoNonNativeTables()}
*/publicclassStorageHandlerMockextendsDefaultStorageHandler{}
所以是
AccumuloStorageHandler
这个类:
packageorg.apache.hadoop.hive.accumulo;/**
* Create table mapping to Accumulo for Hive. Handle predicate pushdown if necessary.
*/publicclassAccumuloStorageHandlerextendsDefaultStorageHandlerimplementsHiveMetaHook,HiveStoragePredicateHandler{}
但是:
@OverridepublicvoidpreDropTable(Table table)throwsMetaException{// do nothing}
这个do nothing!!!一言难尽。这种2PC方式表面上确实很严谨。。。
所以dropTable的入口是:
@OverridepublicvoidcommitDropTable(Table table,boolean deleteData)throwsMetaException{String tblName =getTableName(table);if(!isExternalTable(table)){try{if(deleteData){TableOperations tblOpts = connectionParams.getConnector().tableOperations();if(tblOpts.exists(tblName)){
tblOpts.delete(tblName);}}}catch(AccumuloException e){thrownewMetaException(StringUtils.stringifyException(e));}catch(AccumuloSecurityException e){thrownewMetaException(StringUtils.stringifyException(e));}catch(TableNotFoundException e){thrownewMetaException(StringUtils.stringifyException(e));}}}
按照最简单的内部表、需要删数据来看,实际上调用的是这个
delete
方法。而
TableOperations
又是个接口:
packageorg.apache.accumulo.core.client.admin;/**
* Provides a class for administering tables
*
*/publicinterfaceTableOperations{/**
* Delete a table
*
* @param tableName
* the name of the table
* @throws AccumuloException
* if a general error occurs
* @throws AccumuloSecurityException
* if the user does not have permission
* @throws TableNotFoundException
* if the table does not exist
*/voiddelete(String tableName)throwsAccumuloException,AccumuloSecurityException,TableNotFoundException;}
继承关系简单:
当然就是这个实现类:
packageorg.apache.accumulo.core.client.impl;publicclassTableOperationsImplextendsTableOperationsHelper{@Overridepublicvoiddelete(String tableName)throwsAccumuloException,AccumuloSecurityException,TableNotFoundException{checkArgument(tableName !=null,"tableName is null");List<ByteBuffer> args =Arrays.asList(ByteBuffer.wrap(tableName.getBytes(UTF_8)));Map<String,String> opts =newHashMap<>();try{doTableFateOperation(tableName,TableNotFoundException.class,FateOperation.TABLE_DELETE, args, opts);}catch(TableExistsException e){// should not happenthrownewAssertionError(e);}}}
所以实际入口是这里的
doTableFateOperation
方法。枚举体的
FateOperation.TABLE_DELETE
=2。
找到doTableFateOperation方法的调用栈
跳转到:
privatevoiddoTableFateOperation(String tableOrNamespaceName,Class<?extendsException> namespaceNotFoundExceptionClass,FateOperation op,List<ByteBuffer> args,Map<String,String> opts)throwsAccumuloSecurityException,AccumuloException,TableExistsException,TableNotFoundException{try{doFateOperation(op, args, opts, tableOrNamespaceName);}}
继续跳转:
StringdoFateOperation(FateOperation op,List<ByteBuffer> args,Map<String,String> opts,String tableOrNamespaceName)throwsAccumuloSecurityException,TableExistsException,TableNotFoundException,AccumuloException,NamespaceExistsException,NamespaceNotFoundException{returndoFateOperation(op, args, opts, tableOrNamespaceName,true);}
继续跳转:
StringdoFateOperation(FateOperation op,List<ByteBuffer> args,Map<String,String> opts,String tableOrNamespaceName,boolean wait)throwsAccumuloSecurityException,TableExistsException,TableNotFoundException,AccumuloException,NamespaceExistsException,NamespaceNotFoundException{Long opid =null;try{
opid =beginFateOperation();executeFateOperation(opid, op, args, opts,!wait);if(!wait){
opid =null;returnnull;}String ret =waitForFateOperation(opid);return ret;}catch(ThriftSecurityException e){switch(e.getCode()){case TABLE_DOESNT_EXIST:thrownewTableNotFoundException(null, tableOrNamespaceName,"Target table does not exist");case NAMESPACE_DOESNT_EXIST:thrownewNamespaceNotFoundException(null, tableOrNamespaceName,"Target namespace does not exist");default:String tableInfo =Tables.getPrintableTableInfoFromName(context.getInstance(), tableOrNamespaceName);thrownewAccumuloSecurityException(e.user, e.code, tableInfo, e);}}catch(ThriftTableOperationException e){switch(e.getType()){case EXISTS:thrownewTableExistsException(e);case NOTFOUND:thrownewTableNotFoundException(e);case NAMESPACE_EXISTS:thrownewNamespaceExistsException(e);case NAMESPACE_NOTFOUND:thrownewNamespaceNotFoundException(e);case OFFLINE:thrownewTableOfflineException(context.getInstance(),Tables.getTableId(context.getInstance(), tableOrNamespaceName));default:thrownewAccumuloException(e.description, e);}}catch(Exception e){thrownewAccumuloException(e.getMessage(), e);}finally{Tables.clearCache(context.getInstance());// always finish table op, even when exceptionif(opid !=null)try{finishFateOperation(opid);}catch(Exception e){
log.warn(e.getMessage(), e);}}}
在这里可以发现一些奇怪的现象,居然catch了好多
Thrift
相关的Exception。继续跳转:
// This method is for retrying in the case of network failures; anything else it passes to the caller to deal withprivatevoidexecuteFateOperation(long opid,FateOperation op,List<ByteBuffer> args,Map<String,String> opts,boolean autoCleanUp)throwsThriftSecurityException,TException,ThriftTableOperationException{while(true){MasterClientService.Iface client =null;try{
client =MasterClient.getConnectionWithRetry(context);
client.executeFateOperation(Tracer.traceInfo(), context.rpcCreds(), opid, op, args, opts, autoCleanUp);break;}catch(TTransportException tte){
log.debug("Failed to call executeFateOperation(), retrying ... ", tte);UtilWaitThread.sleep(100);}finally{MasterClient.close(client);}}}
这个死循环里获取了Client对象。但是这个Client一看就没那么简单。。。调用的
executeFateOperation
方法还不能直接Idea点开,需要手动定位。
分析client对象
packageorg.apache.accumulo.core.client.impl;importcom.google.common.net.HostAndPort;publicclassMasterClient{privatestaticfinalLogger log =LoggerFactory.getLogger(MasterClient.class);publicstaticMasterClientService.ClientgetConnectionWithRetry(ClientContext context){while(true){MasterClientService.Client result =getConnection(context);if(result !=null)return result;UtilWaitThread.sleep(250);}}}
实际上又是这个:
publicstaticclassClientextendsFateService.ClientimplementsIface{}
所以其父类是:
packageorg.apache.accumulo.core.master.thrift;@SuppressWarnings({"unchecked","serial","rawtypes","unused"})publicclassFateService{publicinterfaceIface{publicvoidexecuteFateOperation(org.apache.accumulo.core.trace.thrift.TInfo tinfo,org.apache.accumulo.core.security.thrift.TCredentials credentials,long opid,FateOperation op,List<ByteBuffer> arguments,Map<String,String> options,boolean autoClean)throwsorg.apache.accumulo.core.client.impl.thrift.ThriftSecurityException,org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException,org.apache.thrift.TException;}publicvoidexecuteFateOperation(org.apache.accumulo.core.trace.thrift.TInfo tinfo,org.apache.accumulo.core.security.thrift.TCredentials credentials,long opid,FateOperation op,List<ByteBuffer> arguments,Map<String,String> options,boolean autoClean)throwsorg.apache.accumulo.core.client.impl.thrift.ThriftSecurityException,org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException,org.apache.thrift.TException{send_executeFateOperation(tinfo, credentials, opid, op, arguments, options, autoClean);recv_executeFateOperation();}publicstaticclassClientextendsorg.apache.thrift.TServiceClientimplementsIface{}}
所以这种client对象才可以执行
executeFateOperation
方法。
查看executeFateOperation方法
分为2步,字面意思
send_executeFateOperation
方法发送了啥,
recv_executeFateOperation
方法又接收了啥。显然发送消息是需要重点关心的:
publicvoidsend_executeFateOperation(org.apache.accumulo.core.trace.thrift.TInfo tinfo,org.apache.accumulo.core.security.thrift.TCredentials credentials,long opid,FateOperation op,List<ByteBuffer> arguments,Map<String,String> options,boolean autoClean)throwsorg.apache.thrift.TException{
executeFateOperation_args args =newexecuteFateOperation_args();
args.setTinfo(tinfo);
args.setCredentials(credentials);
args.setOpid(opid);
args.setOp(op);
args.setArguments(arguments);
args.setOptions(options);
args.setAutoClean(autoClean);sendBase("executeFateOperation", args);}
这个发送的方法把入参的表名、操作类型【Drop表】设置为sendBase方法的入参。
packageorg.apache.thrift;/**
* A TServiceClient is used to communicate with a TService implementation
* across protocols and transports.
*/publicabstractclassTServiceClient{protectedvoidsendBase(String methodName,TBase<?,?> args)throwsTException{sendBase(methodName, args,TMessageType.CALL);}privatevoidsendBase(String methodName,TBase<?,?> args,byte type)throwsTException{
oprot_.writeMessageBegin(newTMessage(methodName, type,++seqid_));
args.write(oprot_);
oprot_.writeMessageEnd();
oprot_.getTransport().flush();}}
其中:
packageorg.apache.thrift.protocol;/**
* Message type constants in the Thrift protocol.
*
*/publicfinalclassTMessageType{publicstaticfinalbyte CALL =1;publicstaticfinalbyte REPLY =2;publicstaticfinalbyte EXCEPTION =3;publicstaticfinalbyte ONEWAY =4;}
这个type传入的其实是1。用于构造方法:
packageorg.apache.thrift.protocol;/**
* Helper class that encapsulates struct metadata.
*
*/publicfinalclassTMessage{publicTMessage(String n,byte t,int s){
name = n;
type = t;
seqid = s;}publicfinalString name;publicfinalbyte type;publicfinalint seqid;}
另一个泛型TBase:
packageorg.apache.thrift;importjava.io.Serializable;importorg.apache.thrift.protocol.TProtocol;/**
* Generic base interface for generated Thrift objects.
*
*/publicinterfaceTBase<TextendsTBase<?,?>,FextendsTFieldIdEnum>extendsComparable<T>,Serializable{/**
* Reads the TObject from the given input protocol.
*
* @param iprot Input protocol
*/publicvoidread(TProtocol iprot)throwsTException;/**
* Writes the objects out to the protocol
*
* @param oprot Output protocol
*/publicvoidwrite(TProtocol oprot)throwsTException;}
按照注释可以知道
write
方法是把Java的对象输出给协议。
而
executeFateOperation_args
类:
publicstaticclass executeFateOperation_args implementsorg.apache.thrift.TBase<executeFateOperation_args, executeFateOperation_args._Fields>,java.io.Serializable,Cloneable,Comparable<executeFateOperation_args>{publicvoidwrite(org.apache.thrift.protocol.TProtocol oprot)throwsorg.apache.thrift.TException{
schemes.get(oprot.getScheme()).getScheme().write(oprot,this);}}
它的write方法:
packageorg.apache.thrift.scheme;importorg.apache.thrift.TBase;publicinterfaceIScheme<TextendsTBase>{publicvoidread(org.apache.thrift.protocol.TProtocol iproto,T struct)throwsorg.apache.thrift.TException;publicvoidwrite(org.apache.thrift.protocol.TProtocol oproto,T struct)throwsorg.apache.thrift.TException;}
又是跳转到接口。。。
可以看到有2大抽象类。
而
getScheme
拿到的:
packageorg.apache.thrift.protocol;importjava.nio.ByteBuffer;importorg.apache.thrift.TException;importorg.apache.thrift.scheme.IScheme;importorg.apache.thrift.scheme.StandardScheme;importorg.apache.thrift.transport.TTransport;/**
* Protocol interface definition.
*
*/publicabstractclassTProtocol{publicClass<?extendsIScheme>getScheme(){returnStandardScheme.class;}publicabstractvoidwriteMessageBegin(TMessage message)throwsTException;}
显然get到的是
StandardScheme
类。而
writeMessageBegin
又是这个抽象类的抽象方法。
该抽象类的继承关系:
至此可以知道原生支持的协议有这些。最常用的当然就是二进制协议:TBinaryProtocol。
查看TBinaryProtocol二进制协议
packageorg.apache.thrift.protocol;importjava.io.UnsupportedEncodingException;importjava.nio.ByteBuffer;importorg.apache.thrift.TException;importorg.apache.thrift.transport.TTransport;/**
* Binary protocol implementation for thrift.
*
*/publicclassTBinaryProtocolextendsTProtocol{publicvoidwriteMessageBegin(TMessage message)throwsTException{if(strictWrite_){int version = VERSION_1 | message.type;writeI32(version);writeString(message.name);writeI32(message.seqid);}else{writeString(message.name);writeByte(message.type);writeI32(message.seqid);}}}
可以看出
writeMessageBegin
方法就是实际的写数据操作,把消息拆分后写出。
publicvoidwriteString(String str)throwsTException{try{byte[] dat = str.getBytes("UTF-8");writeI32(dat.length);
trans_.write(dat,0, dat.length);}catch(UnsupportedEncodingException uex){thrownewTException("JVM DOES NOT SUPPORT UTF-8");}}
以此为例。会去把数据作为字节数组写出:
packageorg.apache.thrift.transport;importjava.io.Closeable;/**
* Generic class that encapsulates the I/O layer. This is basically a thin
* wrapper around the combined functionality of Java input/output streams.
*
*/publicabstractclassTTransportimplementsCloseable{/**
* Reads up to len bytes into buffer buf, starting at offset off.
*
* @param buf Array to read into
* @param off Index to start reading at
* @param len Maximum number of bytes to read
* @return The number of bytes actually read
* @throws TTransportException if there was an error reading data
*/publicabstractintread(byte[] buf,int off,int len)throwsTTransportException;/**
* Writes up to len bytes from the buffer.
*
* @param buf The output data buffer
* @param off The offset to start writing from
* @param len The number of bytes to write
* @throws TTransportException if there was an error writing data
*/publicabstractvoidwrite(byte[] buf,int off,int len)throwsTTransportException;}
这才是真正的传输对象。其继承关系:
搞过嵌入式开发的一定很熟悉这个Socket!!!就是IP+port的那个Socket。应用层与TCP/IP传输层间的抽象层。。。
查看TIOStreamTransport传输类
packageorg.apache.thrift.transport;/**
* This is the most commonly used base transport. It takes an InputStream
* and an OutputStream and uses those to perform all transport operations.
* This allows for compatibility with all the nice constructs Java already
* has to provide a variety of types of streams.
*
*/publicclassTIOStreamTransportextendsTTransport{publicintread(byte[] buf,int off,int len)throwsTTransportException{if(inputStream_ ==null){thrownewTTransportException(TTransportException.NOT_OPEN,"Cannot read from null inputStream");}int bytesRead;try{
bytesRead = inputStream_.read(buf, off, len);}catch(IOException iox){thrownewTTransportException(TTransportException.UNKNOWN, iox);}if(bytesRead <0){thrownewTTransportException(TTransportException.END_OF_FILE);}return bytesRead;}/**
* Writes to the underlying output stream if not null.
*/publicvoidwrite(byte[] buf,int off,int len)throwsTTransportException{if(outputStream_ ==null){thrownewTTransportException(TTransportException.NOT_OPEN,"Cannot write to null outputStream");}try{
outputStream_.write(buf, off, len);}catch(IOException iox){thrownewTTransportException(TTransportException.UNKNOWN, iox);}}/**
* Flushes the underlying output stream if not null.
*/publicvoidflush()throwsTTransportException{if(outputStream_ ==null){thrownewTTransportException(TTransportException.NOT_OPEN,"Cannot flush null outputStream");}try{
outputStream_.flush();}catch(IOException iox){thrownewTTransportException(TTransportException.UNKNOWN, iox);}}}
其子类TSocket重写了IP、Port和init等。
小结Drop表的流程
至此可以得知Java用API操作Hive的原理,大致是这样:
顶层API【dropTable】→表操作实现类【TableOperationsImpl】的删表方法【doTableFateOperation】
→executeFateOperation方法→Client类的实例对象的executeFateOperation方法
→sendBase方法→executeFateOperation_args静态类的实例对象的write方法输出数据给传输协议TProtocol
→传输协议类的write方法具体把数据写出给Thrift的Server
→Thrift的Server接收到消息后执行对应的操作
最出名的Thrift当然是Hive自己的Hive Server【Standalone】和Hive Server2,还有Spark的Thrift Server,借助它们,可以用JDBC或者Cli的方式去操作Hive。
但是!!!Thrift的初衷就是实现语言无关,毕竟底层只需要能把数据传输到位即可,数据传输并不是Java的特权。
其它语言的Thrift
在
service-rpc
这个路径下,可以发现有cpp、Java、php、py,rb的包!!!
Hive的官方文档写的很明白:
https://cwiki.apache.org/confluence/display/Hive/HiveClient#HiveClient-ThriftJavaClient
The command line client currently only supports an embedded server. The JDBC and Thrift-Java clients support both embedded and standalone servers. Clients in other languages only support standalone servers.
命令行模式目前只能用于嵌入式服务,JDBC和Thrift-Java的Client可以支持嵌入式和独立部署的服务。别的语言的Client只支持在独立部署的服务使用。
Connection con =DriverManager.getConnection("jdbc:hive://localhost:10000/default","","");Statement stmt = con.createStatement();
这种古代的Hive Server就是嵌入模式。。。
Connection con =DriverManager.getConnection("jdbc:hive2://localhost:10000/default","","");
这种Hive Server2就是独立部署模式。
官方还给出了python的案例:
#!/usr/bin/env pythonimport sys
from hive import ThriftHive
from hive.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
try:
transport = TSocket.TSocket('localhost',10000)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = ThriftHive.Client(protocol)
transport.open()
client.execute("CREATE TABLE r(a STRING, b INT, c DOUBLE)")
client.execute("LOAD TABLE LOCAL INPATH '/path' INTO TABLE r")
client.execute("SELECT * FROM r")while(1):
row = client.fetchOne()if(row ==None):breakprint row
client.execute("SELECT * FROM r")print client.fetchAll()
transport.close()except Thrift.TException, tx:print'%s'%(tx.message)
以及PHP的案例:
<?php// set THRIFT_ROOT to php directory of the hive distribution$GLOBALS['THRIFT_ROOT']='/lib/php/';// load the required files for connecting to Hiverequire_once$GLOBALS['THRIFT_ROOT'].'packages/hive_service/ThriftHive.php';require_once$GLOBALS['THRIFT_ROOT'].'transport/TSocket.php';require_once$GLOBALS['THRIFT_ROOT'].'protocol/TBinaryProtocol.php';// Set up the transport/protocol/client$transport=newTSocket('localhost',10000);$protocol=newTBinaryProtocol($transport);$client=newThriftHiveClient($protocol);$transport->open();// run queries, metadata calls etc$client->execute('SELECT * from src');var_dump($client->fetchAll());$transport->close();
Ruby好歹也给了个参考: https://github.com/forward3d/rbhive
至于Java、C++就不给Client的案例了。。。也是很容易理解。。。毕竟Java有JDBC和高层API,一般不会有人去用底层API了。
如果是做平台开发或者组件开发这种真正用得上底层API的情况,地方支援中央发型的老Java程序猿,查API填参数让程序跑起来,这点工程能力还是有的。
至于C++程序猿强悍的造轮子功力,没准像临摹Kafka的Red Panda那样,哪天也照猫画虎折腾出个C++版的Hive。。。
既然可以通过Thrift实现语言无关,那么调用组件就不必局限于Java或者Scala。而造轮子从来也不是Java和Scala的专利。
这就是为神马大数据开发可以脱离Java和Scala。
尾言
大数据并不是趋向SQL化,只是为了扩大受众群体,让广大技术水平不高的业务开发人员也能吃上大数据技术的红利。且SQL在处理结构化表的特定场景下开发效率更高。
但是。。。哪怕是这种极度细分的场景,SQL还是有很多缺陷,虽然API的方式也没有好到哪里去。
造轮子和组件调用,就更是语言无关的事情了。。。编程语言往往只是个表达思想的载体,技术栈足够全面才有做选择的权力。
转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/129742904
版权归原作者 虎鲸不是鱼 所有, 如有侵权,请联系我们删除。