文章目录
一、Spark RDD
RDD:resilient distributed dataset (RDD)
每个spark程序都有一个driver program运行main函数,在cluster集群上执行各种并行操作。我们也可以将RDD持久化到内存,便于在并行操作中重用。RDD 是 Spark 最基本的数据抽象,它是只读的、分区记录的集合,支持并行操作,可以由外部数据集或其他 RDD 转换而来,它具有以下特性:
- 一个 RDD 由一个或者多个分区(Partitions)组成。对于 RDD 来说,每个分区会被一个计算任务所处理,用户可以在创建 RDD 时指定其分区个数,如果没有指定,则默认采用程序所分配到的 CPU 的核心数;
- RDD 拥有一个用于计算分区的函数 compute;
- RDD 会保存彼此间的依赖关系,RDD 的每次转换都会生成一个新的依赖关系,这种 RDD 之间的依赖关系就像流水线一样。在部分分区数据丢失后,可以通过这种依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算;
- Key-Value 型的 RDD 还拥有 Partitioner(分区器),用于决定数据被存储在哪个分区中,目前 Spark 中支持 HashPartitioner(按照哈希分区) 和 RangeParationer(按照范围进行分区);
- 一个优先位置列表 (可选),用于存储每个分区的优先位置 (prefered location)。对于一个 HDFS 文件来说,这个列表保存的就是每个分区所在的块的位置,按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能的将计算任务分配到其所要处理数据块的存储位置。
二、使用RDD functions完成任务2的统计逻辑
from pyspark.sql import SparkSession
from pyspark import SparkFiles
import pandas as pd
spark = SparkSession.builder.appName('pyspark').getOrCreate()
spark.sparkContext.addFile('https://cdn.coggle.club/Pokemon.csv')
df = spark.read.csv("file://"+SparkFiles.get("Pokemon.csv"), header=True, inferSchema=True)
df = df.withColumnRenamed('Sp. Atk','SpAtk')
df = df.withColumnRenamed('Sp. Def','SpDef')
df = df.withColumnRenamed('Type 1','Type1')
df = df.withColumnRenamed('Type 2','Type2')
df.show()+--------------------+-----+------+-----+---+------+-------+-----+-----+-----+----------+---------+| Name|Type1| Type2|Total| HP|Attack|Defense|SpAtk|SpDef|Speed|Generation|Legendary|+--------------------+-----+------+-----+---+------+-------+-----+-----+-----+----------+---------+| Bulbasaur|Grass|Poison|318|45|49|49|65|65|45|1| false|| Ivysaur|Grass|Poison|405|60|62|63|80|80|60|1| false|| Venusaur|Grass|Poison|525|80|82|83|100|100|80|1| false||VenusaurMega Venu...|Grass|Poison|625|80|100|123|122|120|80|1| false|| Charmander| Fire| null|309|39|52|43|60|50|65|1| false|| Charmeleon| Fire| null|405|58|64|58|80|65|80|1| false|| Charizard| Fire|Flying|534|78|84|78|109|85|100|1| false||CharizardMega Cha...| Fire|Dragon|634|78|130|111|130|85|100|1| false||CharizardMega Cha...| Fire|Flying|634|78|104|78|159|115|100|1| false|| Squirtle|Water| null|314|44|48|65|50|64|43|1| false|| Wartortle|Water| null|405|59|63|80|65|80|58|1| false|| Blastoise|Water| null|530|79|83|100|85|105|78|1| false||BlastoiseMega Bla...|Water| null|630|79|103|120|135|115|78|1| false|| Caterpie| Bug| null|195|45|30|35|20|20|45|1| false|| Metapod| Bug| null|205|50|20|55|25|25|30|1| false|| Butterfree| Bug|Flying|395|60|45|50|90|80|70|1| false|| Weedle| Bug|Poison|195|40|35|30|20|20|50|1| false|| Kakuna| Bug|Poison|205|45|25|50|25|25|35|1| false|| Beedrill| Bug|Poison|395|65|90|40|45|80|75|1| false||BeedrillMega Beed...| Bug|Poison|495|65|150|40|15|80|145|1| false|+--------------------+-----+------+-----+---+------+-------+-----+-----+-----+----------+---------+
only showing top 20 rows
rdd = df.rdd
cols = df.columns
for i inrange(len(cols)):print('-'*10,cols[i],'-'*10)print('不同的取值个数:',len(dict(rdd.map(lambda x: x[i]).countByValue())))print('空值个数:',rdd.filter(lambda x: x[i]==None).count())"""
---------- Name ----------
不同的取值个数: 799
空值个数: 0
---------- Type1 ----------
不同的取值个数: 18
空值个数: 0
---------- Type2 ----------
不同的取值个数: 19
空值个数: 386
---------- Total ----------
不同的取值个数: 200
空值个数: 0
---------- HP ----------
不同的取值个数: 94
空值个数: 0
---------- Attack ----------
不同的取值个数: 111
空值个数: 0
---------- Defense ----------
不同的取值个数: 103
空值个数: 0
---------- SpAtk ----------
不同的取值个数: 105
空值个数: 0
---------- SpDef ----------
不同的取值个数: 92
空值个数: 0
---------- Speed ----------
不同的取值个数: 108
空值个数: 0
---------- Generation ----------
不同的取值个数: 6
空值个数: 0
---------- Legendary ----------
不同的取值个数: 2
空值个数: 0
"""
Reference
[1] 官方文档RDD Programming Guide
[2] https://blog.csdn.net/qq_56870570/article/details/118177403?spm=1001.2014.3001.5506
[3] Spark入门阶段一之扫盲笔记
[4] (重点)SPARK官方教程系列快速入门
[5] Spark RDD 简介
版权归原作者 山顶夕景 所有, 如有侵权,请联系我们删除。