spark架构
spark 术语
https://spark.apache.org/docs/latest/cluster-overview.html
Job 一个 action 算子一个job(save、collect 等)
Stage Stage 调度的最小单元,Stage 的划分会产生shuffle。一个Job 由一个或多个Stage
Task 执行的最小单元,一个 Stage 由一个或多个 Task 组成
Parallelism 一个task 就是一个 Parallelism,Parallelism 是逻辑上的一个划分
core core 是实际执行的物理资源,一个core 就是一个线程
Spark on yarn 资源申请
org.apache.spark.deploy.yarn.YarnAllocator
计算公式
spark on yarn 申请资源时,默认需要提供core 和 memory 两种资源:
● core 由 spark.executor.cores 参数指定
● memory 由 spark.executor.memory(堆内存) + spark.executor.memoryOverhead(堆外内存)+ spark.executor.pyspark.memory (默认未设置) + spark.memory.offHeap.size (堆外内存,spark3.x)
spark.executor.memoryOverhead 的参数由 spark.executor.memoryOverheadFactor(默认0.1) 和 spark.executor.memory 决定,最小是384MB。计算公式:max(spark.executor.memory * spark.executor.memoryOverheadFactor, 384)
22G 20G
command: spark-submit --master yarn --queue offline
–jars /data/warehouse/jars/mysql-connector-java-8.0.22.jar
–deploy-mode cluster
–driver-memory 2G
–executor-memory 20G
–conf spark.yarn.executor.memoryOverhead=2G // 4G
–num-executo。s 2
–executor-cores 3
–conf spark.default.parallelism=100
–conf spark.sql.shuffle.partitions=100
–conf spark.sql.broadcastTimeout=3600
–conf spark.network.timeout=3600
–conf spark.shuffle.useOldFetchProtocol=true
–name ads_oa_training_camp_lesson_sales_data_analysis
–class com.jojo.spark.applications.oa.AdsTrainingCampLessonSalesDataAnalysisApp
/data/warehouse/jojo-offline-spark/target/jojo-offline-spark-1.0-SNAPSHOT.jar
–run.env “prod”
memoryOverhead 不足的常见报错
● Container killed by YARN for exceeding virtual memory limits.
● Container killed by YARN for exceeding physical memory limits.
Spark 内存管理
https://spark.apache.org/docs/latest/tuning.html#memory-tuning
统一内存管理
org.apache.spark.memory.UnifiedMemoryManager
On-Heap 内存
Off-Heap 内存
在默认情况下堆外内存并不启用,可通过配置spark.memory.offHeap.enabled参数启用,并由spark.memory.offHeap.size参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式基本相同,所有运行中的并发任务共享存储内存和执行内存。
分片读取
FileSourceScanExec文件读取
spark 在读取文件类型的数据时会根据文件格式、大小、支持分片等因素自动计算source 读取时的分区大小。
计算公式:
defaultMaxSplitBytes = 128MB
openCostInBytes = 4MB
minPartitionNum = defaultParallelism = yarn 申请的core
totalBytes = sum(每个文件大的小 + openCostInBytes)
bytesPerCore = totalBytes / minPartitionNum
maxSplitBytes = min(defaultMaxSplitBytes, max(openCostInBytes,bytesPerCore))
partitionSize = totalBytes / maxSplitBytes
orc、parquet 文件格式不受压缩算法限制,都支持分片,文本文件需要使用支持分片的压缩算法。
版权归原作者 FEARNONR 所有, 如有侵权,请联系我们删除。