0


Hive的UDF开发之向量化表达式(VectorizedExpressions)

1. 背景

笔者的大数据平台XSailboat的SailWorks模块包含离线分析功能。离线分析的后台实现,包含调度引擎、执行引擎、计算引擎和存储引擎。计算和存储引擎由Hive提供,调度引擎和执行引擎由我们自己实现。调度引擎根据DAG图和调度计划,安排执行顺序,监控执行过程。执行引擎接收调度引擎安排的任务,向Yarn申请容器,在容器中执行具体的任务。

我们的离线分析支持编写Hive的UDF函数,打包上传,并声明使用函数。
在这里插入图片描述
我们通常会通过继承org.apache.hadoop.hive.ql.udf.generic.GenericUDF来自定义自己的UDF函数,再参考Hive实现的内置UDF函数时,经常会看到在它的类名上,有@VectorizedExpressions注解,翻译过来即“向量化表达式”。在此记录一下自己学习到的知识和理解。

官方文档《Vectorized Query Execution》
有以下应该至少知道的点:

  1. 向量化查询缺省是关闭的;
  2. 要能支持向量化查询,数据存储格式必需是ORC格式(我们主要是用CSV格式)。

通常所说的向量化计算主要是从以下几个方面提升效率:

  1. 利用CPU底册指令对向量的运算
  2. 利用多核/多线程的能力进行并发计算

而Hive的向量化执行,主要是代码逻辑聚合并充分利用上下文,减少判断次数,减少对象的访问处理和序列化次数,数据切块并行。

2. 实践

packagecom.cimstech.udf.date;importjava.io.UnsupportedEncodingException;importjava.text.ParseException;importjava.text.SimpleDateFormat;importjava.util.Arrays;importorg.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;importorg.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;importorg.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;importorg.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor;importorg.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;importorg.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;importorg.apache.hadoop.hive.ql.metadata.HiveException;importcom.cimstech.xfront.common.excep.WrapException;importcom.cimstech.xfront.common.text.XString;publicclassVectorUDFStringToTimstampextendsVectorExpression{privatestaticfinallong serialVersionUID =1L;/**
     * 列序号
     */int mColNum0 ;/**
     * 时间格式
     */String mDateFmt ;transientSimpleDateFormat mSdf ;/**
     * 必需得有1个无参的构造函数.        <br />
     * hive会先通过无参构造函数创建一个实例,然后调用getDescriptor()方法,取得描述。
     * 通过描述知道有哪几列,分别是什么格式的,才知道怎么调用有参构造函数。
     */publicVectorUDFStringToTimstamp(){super();}/**
     * 有参构造函数的参数要和getDescriptor中取得的描述相对应。
     * Column类型的输入,在此用int类型列序号表示            <br />
     * 标量列直接是相应类型即可。                        
     * @param aColNum0
     * @param aDateFmt
     * @param aOutputColumnNum
     */publicVectorUDFStringToTimstamp(int aColNum0 ,String aDateFmt,int aOutputColumnNum){super(aOutputColumnNum);
        mColNum0 = aColNum0 ;
        mDateFmt = aDateFmt ;}@OverridepublicStringvectorExpressionParameters(){returngetColumnParamString(0, mColNum0)+" , val "+ mDateFmt ;}privatevoidsetDatetime(TimestampColumnVector aTimestampColVector,byte[][] aVector,int aElementNum)throwsHiveException{if(mSdf ==null)
            mSdf =newSimpleDateFormat(mDateFmt);String dateStr =null;try{
            dateStr =newString(aVector[aElementNum],"UTF-8");
            aTimestampColVector.getScratchTimestamp().setTime(mSdf.parse(dateStr).getTime());}catch(UnsupportedEncodingException e){WrapException.wrapThrow(e);return;// dead code}catch(ParseException e){thrownewHiveException(XString.msgFmt("时间字符串[{}]无法按模式[{}]解析!", dateStr , mDateFmt));}
        aTimestampColVector.setFromScratchTimestamp(aElementNum);}@Overridepublicvoidevaluate(VectorizedRowBatch aBatch)throwsHiveException{if(childExpressions !=null){evaluateChildren(aBatch);}int n = aBatch.size;if(n ==0)return;BytesColumnVector inputColVector =(BytesColumnVector) aBatch.cols[mColNum0];TimestampColumnVector outputColVector =(TimestampColumnVector) aBatch.cols[outputColumnNum];boolean[] inputIsNull = inputColVector.isNull;boolean[] outputIsNull = outputColVector.isNull;byte[][] vector = inputColVector.vector;if(inputColVector.isRepeating){// 如果是重复的,那么只需要解析第1个就行if(inputColVector.noNulls ||!inputIsNull[0]){
                outputIsNull[0]=false;setDatetime(outputColVector, vector,0);}else{// 重复,且都是null,那么没有可解析的,如下设置即可
                outputIsNull[0]=true;
                outputColVector.noNulls =false;}
            outputColVector.isRepeating =true;return;}else
            outputColVector.isRepeating =false;if(inputColVector.noNulls)// 没有为null的{// selectedInUse为true,表示选中输入中的指定行进行处理。if(aBatch.selectedInUse){int[] sel = aBatch.selected;if(!outputColVector.noNulls)// 全局被标为了有null值,那么各个为止都需要单独设置是否为null{for(int j =0; j != n; j++){finalint i = sel[j];
                        outputIsNull[i]=false;// 某一行,单独设置不为nullsetDatetime(outputColVector, vector, i);}}else{for(int j =0; j != n; j++){finalint i = sel[j];// 全局被标为了没有null值,那么无需一行行标注非nullsetDatetime(outputColVector, vector, i);}}}else{// 输入是全局没有null值的,输出被全局标为了有null值,那么把输出改过来,改为全局没有null值if(!outputColVector.noNulls){Arrays.fill(outputIsNull,false);// 所有输出都非null
                    outputColVector.noNulls =true;// 改为全局没有null值}for(int i =0; i != n; i++){setDatetime(outputColVector, vector, i);}}}else// 输入数据是有null的{
            outputColVector.noNulls =false;if(aBatch.selectedInUse){int[] sel = aBatch.selected;for(int j =0; j != n; j++){int i = sel[j];
                    outputIsNull[i]= inputIsNull[i];if(!outputIsNull[i])setDatetime(outputColVector, vector, i);}}else{System.arraycopy(inputIsNull,0, outputIsNull,0, n);for(int i =0; i != n; i++){if(!outputIsNull[i])setDatetime(outputColVector, vector, i);}}}}@OverridepublicDescriptorgetDescriptor(){return(newVectorExpressionDescriptor.Builder())// 不是过滤,都认为是投影(Projection)。投影是数据库理论中的专业术语// 投影是根据输入,构造输出,填充输出列// 过滤就是设置aBatch.selected.setMode(VectorExpressionDescriptor.Mode.PROJECTION).setNumArguments(2).setArgumentTypes(VectorExpressionDescriptor.ArgumentType.STRING,VectorExpressionDescriptor.ArgumentType.STRING).setInputExpressionTypes(VectorExpressionDescriptor.InputExpressionType.COLUMN,VectorExpressionDescriptor.InputExpressionType.SCALAR)// 标量,指定的字符串常量,就是标量.build();}}

本文转载自: https://blog.csdn.net/OkGogooXSailboat/article/details/136181946
版权归原作者 OkGogooXSailboat 所有, 如有侵权,请联系我们删除。

“Hive的UDF开发之向量化表达式(VectorizedExpressions)”的评论:

还没有评论