0


pyspark自定义UDF函数

当遇到一些复杂特殊的计算场景时,只通过pyspark的内置函数无法达到我们想要实现的效果,此时,可通过自定义函数然后注册为UDF函数,就能够很好的解决复杂计算场景问题,且计算效率非常快速。

# 配置spark接口import os
import findspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
os.environ["JAVA_HOME"]="/usr/local/jdk1.8.0_192"
findspark.init("/usr/local/hadoop/spark-2.4.4-bin-hadoop2.6/")# 设置内存大小
conf = SparkConf()# conf.set('fs.defaultFS','hdfs://dmns1') # 指定文件路径
conf.set("spark.driver.memory","4g")
conf.set("spark.executor.memory","4g")
conf.set("spark.port.maxRetries","64")# 设置可以绑定的最大端口数
spark = SparkSession.builder.appName("udf_spark").master("local[*]").enableHiveSupport().config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
# 导入数据类型from pyspark.sql.types import IntegerType,DoubleType,StringType
# 实例化自定义类
ov = OfferValue()# 自定义函数 计算复杂场景defmy_function(offer):
    offer_list = offer.split(',')
    offer_value = ov.get_value(offer_list)return offer_value

# 注册为udf函数 命名为my_function
spark.udf.register('my_function',my_function, DoubleType())# sqlAPI调用udf函数
spark.sql("create table database.table1 stored as parquet as \
select my_function(offer_list)as avg_value \
from database.table2")# 关闭spark接口
spark.stop()print(f"计算完成")

计算5000多万数据,仅需一分钟不到,效率非常高。

标签: python spark 分布式

本文转载自: https://blog.csdn.net/LLMUZI123456789/article/details/136290932
版权归原作者 灯下夜无眠 所有, 如有侵权,请联系我们删除。

“pyspark自定义UDF函数”的评论:

还没有评论