背景
spark的分区无处不在,但是编程的时候又很少直接设置,本文想通过一个例子说明从spark读取数据到内存中后的分区数,然后经过shuffle操作后的分区数,最后再通过主动设置repartition函数时生成的分区数,把数据从读取到写出过程中的分区数做个总结
分析
首先我们写一段读取目录下的csv文件,对Dataframe进行shuffle操作,聚合操作后把数据写到另外一个目录中的代码来进行分析
from __future__ importprint_functionimportsys
from pyspark importSparkConf
from pyspark.sql importSparkSessionif __name__ =="__main__":
appname ="test" # 任务名称
master ="local[*]" # 单机模式设置
# conf =SparkConf().setAppName(appname).setMaster(master).set("spark.driver.host", spark_driver_host) # 集群
conf =SparkConf().setAppName(appname).setMaster(master).set("spark.sql.shuffle.partitions",50) # 本地
spark =SparkSession.builder.config(conf=conf).getOrCreate()
# get the M&M data set file name
mnm_file = '''E:\学习\Spark快速大数据分析2E-随书代码包\chapter2\py\src\data\*.csv'''
# read the file into a SparkDataFrame
mnm_df =(spark.read.format("csv").option("header","true").option("inferSchema","true").load(mnm_file))print(mnm_df.rdd.getNumPartitions())
# aggregate count of all colors and groupBy state and color
# orderBy descending order
count_mnm_df =(mnm_df.select("State","Color","Count").groupBy("State","Color").sum("Count").orderBy("sum(Count)", ascending=False))print(count_mnm_df.rdd.getNumPartitions())
count_mnm_df.write.format('csv').option("header","true").mode("overwrite").save("""E:\学习\data\csv""")
repartition_mnm = count_mnm_df.repartition(20)print(repartition_mnm.rdd.getNumPartitions())
spark.stop()
最终结果输出如下:
输入数据分区数:mnm_df.rdd.getNumPartitions()=3
shuffle分区数:count_mnm_df.rdd.getNumPartitions()=50
repartition分区数:repartition_mnm.rdd.getNumPartitions()=20
分析如下:
1.源目录由3个csv文件,所以spark在读取这个目录下的文件时分成了三个分区读取,也就是由三个task分别读取不同的csv文件
2.对读取后的数据进行聚合shuffle操作后数据被分成了50个分区,这个数量是我们使用spark.sql.shuffle.partitions进行配置的,默认是200个,也就是spark使用了50个task进行并行操作
3.对shuffle聚合操作之后的dataframe结果保存到最终目录,spark此时会使用50个task分别写不同的分区文件,最终结果可以发现总共有50个csv子文件生成
总结
一. spark读取文件数据的时候生成多少个分区一般由文件数量决定的,不过如果读取hdfs文件,生成的分区数一般由该文件拥有多少个数据块确定,然后spark在进行数据shuffle操作时生成的分区数由spark.sql.shuffle.partitions参数决定
二. 每个分区数只能对应一个spark的任务数,每个spark任务数对应一个核心,为了能充分利用集群的cpu机器资源,我们的分区数只有大于等于spark总的核心数量是才能达到充分利用集群cpu机器资源的效果
三 如何调整分区数:我们可以使用repartition或者colease方法调整DataFrame的分区数量
版权归原作者 lixia0417mul2 所有, 如有侵权,请联系我们删除。