0


由spark.sql.shuffle.partitions混洗分区浅谈下spark的分区

背景

spark的分区无处不在,但是编程的时候又很少直接设置,本文想通过一个例子说明从spark读取数据到内存中后的分区数,然后经过shuffle操作后的分区数,最后再通过主动设置repartition函数时生成的分区数,把数据从读取到写出过程中的分区数做个总结

分析

首先我们写一段读取目录下的csv文件,对Dataframe进行shuffle操作,聚合操作后把数据写到另外一个目录中的代码来进行分析

from __future__ importprint_functionimportsys

from pyspark importSparkConf
from pyspark.sql importSparkSessionif __name__ =="__main__":
    appname ="test"  # 任务名称
    master ="local[*]"  # 单机模式设置
    # conf =SparkConf().setAppName(appname).setMaster(master).set("spark.driver.host", spark_driver_host) # 集群
    conf =SparkConf().setAppName(appname).setMaster(master).set("spark.sql.shuffle.partitions",50)  # 本地
    spark =SparkSession.builder.config(conf=conf).getOrCreate()
    # get the M&M data set file name
    mnm_file = '''E:\学习\Spark快速大数据分析2E-随书代码包\chapter2\py\src\data\*.csv'''
    # read the file into a SparkDataFrame
    mnm_df =(spark.read.format("csv").option("header","true").option("inferSchema","true").load(mnm_file))print(mnm_df.rdd.getNumPartitions())

    # aggregate count of all colors and groupBy state and color
    # orderBy descending order
    count_mnm_df =(mnm_df.select("State","Color","Count").groupBy("State","Color").sum("Count").orderBy("sum(Count)", ascending=False))print(count_mnm_df.rdd.getNumPartitions())
    count_mnm_df.write.format('csv').option("header","true").mode("overwrite").save("""E:\学习\data\csv""")
    repartition_mnm = count_mnm_df.repartition(20)print(repartition_mnm.rdd.getNumPartitions())
    spark.stop()

最终结果输出如下:
输入数据分区数:mnm_df.rdd.getNumPartitions()=3
shuffle分区数:count_mnm_df.rdd.getNumPartitions()=50
repartition分区数:repartition_mnm.rdd.getNumPartitions()=20

分析如下:
1.源目录由3个csv文件,所以spark在读取这个目录下的文件时分成了三个分区读取,也就是由三个task分别读取不同的csv文件
2.对读取后的数据进行聚合shuffle操作后数据被分成了50个分区,这个数量是我们使用spark.sql.shuffle.partitions进行配置的,默认是200个,也就是spark使用了50个task进行并行操作
3.对shuffle聚合操作之后的dataframe结果保存到最终目录,spark此时会使用50个task分别写不同的分区文件,最终结果可以发现总共有50个csv子文件生成

总结

一. spark读取文件数据的时候生成多少个分区一般由文件数量决定的,不过如果读取hdfs文件,生成的分区数一般由该文件拥有多少个数据块确定,然后spark在进行数据shuffle操作时生成的分区数由spark.sql.shuffle.partitions参数决定
二. 每个分区数只能对应一个spark的任务数,每个spark任务数对应一个核心,为了能充分利用集群的cpu机器资源,我们的分区数只有大于等于spark总的核心数量是才能达到充分利用集群cpu机器资源的效果
三 如何调整分区数:我们可以使用repartition或者colease方法调整DataFrame的分区数量

标签: spark sql 大数据

本文转载自: https://blog.csdn.net/lixia0417mul2/article/details/127947048
版权归原作者 lixia0417mul2 所有, 如有侵权,请联系我们删除。

“由spark.sql.shuffle.partitions混洗分区浅谈下spark的分区”的评论:

还没有评论