一、PySpark应用背景
大规模结构化数据处理要求;
scala编写函数对开发人员接受度低,python的高度简化代码完美契合数据处理过程;
和scala处理无任何性能上的差异;
二、PySpark原理
Spark是什么:
分布式(集群部署),高性能(基于内存可缓存磁盘),高可用的基于RDD(分区的不可变的弹性分布数据集)有向无环数据处理引擎。
Spark现在常用的应用:Spark Sql
Spark sql是什么:
为结构化的数据处理提供sql支持,spark通过catalyst优化引擎对sql进行分解,逻辑及硬件优化解析,最后将sql通过code generation转化为可执行代码。总的来说spark sql引用了sql来处理HDFS上面结构化数据,隔离了代码,开发成本极低,学习成本=mysql学习成本。
为什么要有PySpark:
PySpark(python+spark)Spark支持开发语言有python,为什么不用呢?scala上手过于麻烦,且scala是基于jvm的类java语言,外行很难上手。
PySpark中的应用实例:
Pyspark Sql+python代码,亦可以结合pandas各种交叉开发。
三、Pyspark实战
无需多言直接贴代码
1、spark实例创建,可作为工具类直接引入实现,以下为spark应用实例初始化。(这一步骤在spark集群内部手动封装了,无需频繁创建,详情见:关于Spark应用公共配置封装到PySpark)
from pyspark.sql import SparkSession
def create_spark_session(app_name):
生产用
spark = (SparkSession.builder
.appName(app_name) #应用名称:可写可不写,自己理解
.enableHiveSupport() #hive支持
.config(“spark.sql.parquet.compression.codec”, “snappy”) #config为参数优化
.config(“hive.exec.dynamic.partition”, “true”)
.config(“hive.exec.dynamic.partition.mode”, “nonstrict”)
.config(“parquet.compression”, “SNAPPY”)
.config(“hive.exec.max.dynamic.partitions”, “3000”)
.config(“parquet.enable.dictionary”, “false”)
.config(“hive.support.concurrency”, “true”)
.config(“spark.sql.hive.convertMetastoreParquet”, “false”)
.config(“spark.sql.parquet.writeLegacyFormat”, “true”)
.config(“spark.sql.shuffle.partitions”, 500)
.config(“spark.default.parallelism”, 100)
.config(“spark.storage.memoryFraction”, 0.5)
.config(“spark.reducer.maxSizeInFlight”, “128”)
.config(“spark.shuffle.memoryFraction”, 0.3)
.config(“spark.shuffle.file.buffer”, “32m”)
.config(“spark.shuffle.sort.bypassMergeThreshold”, “300”)
.config(“spark.shuffle.io.maxRetries”, “60”)
.config(“spark.shuffle.io.retryWait”, “60s”)
.config(“spark.shuffle.consolidateFiles”, “true”)
.getOrCreate()) #通过Sparksession创建实例
本地用
spark = (SparkSession.builder
.enableHiveSupport()
.appName(“MyApp”)
.config(“spark.driver.memory”, “8g”) #本地执行,资源分配
.config(“spark.executor.memory”, “8g”)
.config(“spark.executor.cores”, “10”)
.getOrCreate())
return spark
2、引用实例,操作写sql
import create_spark_session #引入上一步实例
spark = create_spark_session(“pj_js”) #创建实例,pj_js为应用名,自定义。
#无需多言,写sql,spark中写sql需要用spark.sql(“sql内容”).来包围
#.show展示数据,show()中可以带参数100,100为展示100行
spark.sql(“select * from **”).show(100)
#创建临时表为:
spark.sql(“select * from **”).createOrReplaceTempView(“jg_table”)
#临时表可查询
spark.sql(“select * from jg_table”).show(100)
#临时表可生成DataFrame
val jgDF = spark.sql(“select * from jg_table”).ToDF(“nsrsbh”,“nsrmc”,“djrq”)
#DataFrame创建临时表
jgDF.createOrReplaceTempView(“ls_table”)
#DataFrame选取一列作为参数传入python自定义函数如:
result = findCombinations(values_list, jgDF.select(“hkje”).collect()[0][0])
#DataFrame引入pandas进行操作
未完待续。。。
3、如何运行?
通过spark submit提交到大数据集群(因为有spark集群),集群通过yarn管理和调度资源,因为是用了spark,资源会就近原则,就近计算节点分区数据,高性能!。
提交脚本参考:
sudo -u hive /opt/spark/bin/spark-submit --master spark://centos2:7077 --driver-memory=4G --executor-memory 4g --conf spark.cores.max=28 /home/app/your_python_file.py
四、PySpark如何各种工具交叉操作数据
未完待续。。。
五、注意
1、实际生产加上这个,因为每台服务器上面的集群一开始没配置完善,导致部分集群spark中python环境不一致
import os
os.environ[“PYSPARK_PYTHON”]=“/var/lib/hive/anaconda3/bin/python3”
os.environ[“PYSPARK_DRIVER_PYTHON”]=“/var/lib/hive/anaconda3/bin/python3”
2、PySpark日志管理:INFO,DEBUG,ERROR
spark内置日志管理:
spark.sparkContext.setLogLevel("INFO")
或者python日志管理下:
import logging
logging.basicConfig(level=logging.INFO, format=‘%(asctime)s - %(name)s - %(levelname)s - %(message)s’)
log.info(‘这是一条日志提醒你代码运行成功,数据库已初始化’)
3、由于我对pyspark实战第一步创建实例函数进行了封装,所以代码中无需重复创建create_spark_session函数。实际生产代码为:
coding:utf-8
from pyspark.create_session import create_spark_session
spark = create_spark_session(“**”)
spark.sql(“select * from **”).show(10)
4、
六、总结
简洁明了的Python语言 + 简单的Sql + 高性能的spark计算引擎 + 任务提交的简单可靠(不用打成jar包,代码vi就能修改操作)
处理点数据还是没多大问题
spark中的spark streaming没有实战过,但是也是支持实时流的,区别flink一个是触发型一个是非触发型数据流处理。
!!!待完善中…
版权归原作者 muwfm 所有, 如有侵权,请联系我们删除。