0


SparkSQL的UDF大数据量执行结果和HiveSQL的UDF不一致

摘要: SparkSQL的UDF大数据量执行结果和HiveSQL的UDF不一致
关键词: 大数据 SparkSQL UDF 线程安全

文章目录

一、软件版本:

1.Hive版本

Hive 1.2.1000.2.6.5.0-292

2.Spark版本

Spark version 2.3.0.2.6.5.0-292

二、项目场景:

交付项目上基本所有的脚本任务,都是使用Hive脚本的方式生成数据,但是dolphinscheduler的数据质量sql,是基于SparkSQL构建的。
当我们有一些复杂逻辑的时候,我们会编写UDF,我们在Hive里注册UDF,然后在Spark里使用

三、问题描述:

1.UDF代码

packagecom.bigdata.udf.data_quality;importorg.apache.hadoop.hive.ql.exec.Description;importorg.apache.hadoop.hive.ql.exec.UDF;importorg.apache.hadoop.io.Text;importjava.text.SimpleDateFormat;importjava.util.Date;importjava.util.regex.Matcher;importjava.util.regex.Pattern;/**
 * 日期格式检查
 * create function UDFDateCheck as 'com.iflytek.hive.DateCheckUDF' using jar 'hdfs:///user/hive/udf/check_udf-1.0-SNAPSHOT.jar';
 * */@Description(
        name ="dateCheck",
        value ="_FUNC_(date) - check date or datetime format",
        extended ="select dateCheck('2012-05-20') from  dual; \n"+"result is '1' \n"+"select dateCheck('2020-05-20 12:20:56') from dual; \n"+"result is '1' \n"+"select dateCheck('abcd-ef-gh') from dual; \n"+"result is '0'")publicclassDateCheckUDFextends UDF {publicstaticSimpleDateFormat DATE_FORMAT_NUMS =newSimpleDateFormat("yyyyMMdd");publicstaticSimpleDateFormat DATETIME_FORMAT_NUMS =newSimpleDateFormat("yyyyMMddHHmmss");//当前只判断这三种格式publicstaticSimpleDateFormat DATE_FORMAT_SPILT_2Y =newSimpleDateFormat("yy-MM-dd");publicstaticSimpleDateFormat DATE_FORMAT_SPILT_4Y =newSimpleDateFormat("yyyy-MM-dd");publicstaticSimpleDateFormat DATETIME_FORMAT_SPILT =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");/**
     * YYYYMMDD日期 考虑平年闰年,考虑大小月
     */publicstaticfinalPattern pYearMonthDay =Pattern.compile("((\\d{3}[1-9]|\\d{2}[1-9]\\d|\\d[1-9]\\d{2}|[1-9]\\d{3})(((0[13578]|1[02])(0[1-9]|[12]\\d|3[01]))|((0[469]|11)(0[1-9]|[12]\\d|30))|(02(0[1-9]|[1]\\d|2[0-8]))))|(((\\d{2})(0[48]|[2468][048]|[13579][26])|((0[48]|[2468][048]|[3579][26])00))0229)");/**
     * HHmmss时间
     */publicstaticfinalPattern pHourMinuteSecond =Pattern.compile("^(20|21|22|23|[0-1]\\d)[0-5]\\d[0-5]\\d$");privatestaticfinalString result_0 ="0";privatestaticfinalString result_1 ="1";/**
     * 返回值 0 校验不通过
     * */privatestaticfinalText RESULT_0  =newText("0");publicTextevaluate(Text date){returndateTimeCheck(date);}//日期检查publicstaticbooleandateCheck(String dateStr){Matcher matcher = pYearMonthDay.matcher(dateStr);return matcher.matches();}//时间检查HHmmss (HH:mm:ss去掉了冒号)publicstaticbooleantimeCheck(String timeStr){Matcher matcher = pHourMinuteSecond.matcher(timeStr);return matcher.matches();}publicstaticTextdateTimeCheck(Text date){if(date ==null){return RESULT_0;}String input = date.toString();String result ="";switch(input.length()){case8:case10:case14:case19:{try{if(input.length()==8){if(input.contains("-")){Date formatDate = DATE_FORMAT_SPILT_2Y.parse(input);String tmp = input.replaceAll("-","");String numsDate = DATE_FORMAT_NUMS.format(formatDate);if(!numsDate.contains(tmp)){
                                result = result_0;break;}if(dateCheck(numsDate)){
                                result = result_1;}}else{Date numsDate = DATE_FORMAT_NUMS.parse(input);String numsDateStr = DATE_FORMAT_NUMS.format(numsDate);if(dateCheck(numsDateStr)){
                                result =  result_1;}else{
                                result =  result_0;}}}if(input.length()==10){Date formatDate = DATE_FORMAT_SPILT_4Y.parse(input);String tmp = input.replaceAll("-","");String numsDate = DATE_FORMAT_NUMS.format(formatDate);if(!numsDate.equals(tmp)){
                            result = result_0;break;}if(dateCheck(tmp)){
                            result =  result_1;}else{
                            result =  result_0;}}if(input.length()==14){Date numsDate = DATETIME_FORMAT_NUMS.parse(input);String numsDateStr = DATETIME_FORMAT_NUMS.format(numsDate);String dateStr = numsDateStr.substring(0,8);String timeStr = numsDateStr.substring(8);if(dateCheck(dateStr)&&timeCheck(timeStr)){
                            result = result_1;}else{
                            result =  result_0;}}if(input.length()==19){Date formatDate = DATETIME_FORMAT_SPILT.parse(input);String tmp = input.replaceAll("-","").replaceAll(" ","").replaceAll(":","");String numsDate = DATETIME_FORMAT_NUMS.format(formatDate);if(!numsDate.equals(tmp)){
                            result = result_0 ;break;}String dateStr = tmp.substring(0,8);String timeStr = tmp.substring(8);if(dateCheck(dateStr)&&timeCheck(timeStr)){
                            result = result_1;}}break;}catch(Exception e){
                    result = result_0;break;}}default:{
                result = result_0;}}returnnewText(result);}}

2.注册UDF

代码很长,主要是判断各种日期格式,然后在Hive的default库里注册UDF为UDFDateCheck函数,然后因为Spark可以直接读取Hive的Metastore,所以也可以在Spark的default库使用UDFDateCheck函数

3.执行SQL

select UDFDateCheck(update_time),update_time 
  from u_ods.ods_u_st_qjysjzx_vh_check
WHERE1=1and update_time isnotnulland update_time<>''and UDFDateCheck(update_time)=0limit10;

1).HiveSQL里执行

返回结果为空

2).在SparkSQL里执行

返回结果

1    20230630083553
0    20231228103659
0    20231226104024
0    20231227161317
1    20231119003340
1    20230105230002
1    20231119003149
1    20231220154554
1    20230321140113
1    20231226132732

且多次执行,第一列返回不同,UDFDateCheck(update_time)有时为1有时为0,而我的筛选条件是UDFDateCheck(update_time) = 0,这里已经异常了

四、原因分析:

1.分析原因可能是Spark内存不够

提升Spark内存之后,仍然没有效果

2.分析Spark和Hive特点

1).Spark UDF特点

当在Spark中使用UDF时,同一个UDF实例可能会被不同的线程调用,特别是在进行数据分区处理时。
这意味着如果UDF中存在共享状态或全局变量,并且没有正确地管理这些资源的话,就可能导致线程安全问题。
如果UDF中包含非线程安全的操作,例如修改静态变量或实例变量,那么在高并发的情况下(如处理大数据量时),就可能会遇到数据竞争条件(race conditions),从而导致错误的结果或异常

数据竞争条件解释

数据竞争条件指的是,在并发环境中,多个线程试图访问并修改同一份数据,但由于缺乏适当的同步控制,导致数据处于不一致的状态。这种情况通常发生在以下几个场景中:

共享静态变量:如果UDF修改了一个静态变量,并且这个变量没有被适当地同步,那么多个线程同时访问和修改这个变量时,可能会导致不可预测的行为。

实例变量:如果UDF是一个类的方法,并且该方法修改了类的实例变量,而这些实例变量又是在多个线程间共享的,那么同样可能会引发数据竞争。

外部资源:如果UDF需要访问一些外部资源(如数据库连接、文件系统等),并且这些资源的访问没有被正确地同步,也可能会导致数据竞争条件。

2).Hive UDF特点

Hive是一个基于磁盘的数据仓库技术,它的查询执行引擎更倾向于批处理模型。在 Hive中,每个UDF通常在一个单独的JVM进程中运行,这意味着即使UDF不是线程安全的,也不会像在Spark中那样容易出现问题,因为每个查询都会创建一个新的UDF实例

3.分析结果总结

因为Spark是线程不安全的,所以如果UDF使用了非线程安全的操作,那么就会导致不可预测行为,Hive是每个UDF在单独的JVM里执行,就会好很多。
从我们代码中可以看出我们使用了线程不安全的操作

publicstaticSimpleDateFormat DATE_FORMAT_NUMS =newSimpleDateFormat("yyyyMMdd");

SimpleDateFormat类在Java中是非线程安全的。这是因为SimpleDateFormat的状态可能会被多个线程同时访问和修改,特别是在使用format和parse方法时,如果多个线程同时调用同一个SimpleDateFormat实例,就有可能导致数据错误或异常行为

五、解决方案:

1.解决方案列举

1).避免共享状态:

尽量使UDF成为纯函数,即UDF的输出只依赖于输入参数,而不依赖于任何外部状态。

2).使用线程安全的数据结构:

如果必须使用共享资源,确保使用线程安全的数据结构或实现同步机制(如 ThreadLocal等)来保护共享资源。

3).利用Spark的累加器:

如果需要收集中间结果,可以使用Spark的累加器,这是一种只读于任务但写入于驱动程序的特殊变量,它们可以安全地在多个任务之间共享。

4).利用广播变量:

对于只读的共享变量,可以使用 Spark 的广播变量来减少数据的复制次数,提高性能,但需注意广播变量也是线程安全的。

2.解决方案选择

方案1需要修改太多,方案3和方案4需要去写Spark代码
所以最终选择方案2
使用ThreadLocal是最高效且线程安全的方法

privatestaticfinalThreadLocal<SimpleDateFormat> DATE_FORMAT_NUMS =ThreadLocal.withInitial(()->{returnnewSimpleDateFormat("yyyyMMdd");});

这里只举例修改,把代码中涉及SimpleDateFormat的全部使用了ThreadLocal
修改完成之后在Spark里执行UDF也能得到和Hive一样的结果了

六、总结:

编写UDF的时候,要注意线程安全的问题,尤其是在SparkSQL会使用这些UDF的时候

标签: spark sql hive

本文转载自: https://blog.csdn.net/pengpenhhh/article/details/142757732
版权归原作者 鹏说大数据 所有, 如有侵权,请联系我们删除。

“SparkSQL的UDF大数据量执行结果和HiveSQL的UDF不一致”的评论:

还没有评论