0


PySpark 速查表

本文总计 1000 字,预计阅读需要 3 -5分钟

在本文中,我将介绍datacamp的这份Pyspark的速查表

初始化 Spark 环境

每当我们要使用 Spark 时,首先需要初始化 Spark 环境。

SparkContext:Spark Driver将使用 SparkContext 与集群连接和通信,并与资源管理系统协调作业。

SparkSession:从 Spark 2.0 开始,Spark Session 构建了通向不同数据源的网关,例如 SQL 或 Hive。在 Spark 1.x 中,我们需要定义 SQLContext 或 HiveContext 来与相应的源进行通信。

总之,如果我们想使用 Spark Dataframe 或 Dataset,我们需要定义 SparkSession,而SparkContext 则只能处理 RDDs

import numpy as np
import pandas as pd
from pyspark import SparkContext, StorageLevel, SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set("spark.driver.memory", "4g")
conf.set("spark.executor.memory", "4g")

sc = SparkContext.getOrCreate(conf)
spark = SparkSession.builder.getOrCreate()

加载数据

可以使用可迭代数据或外部数据源创建 rdd。这里将movielen 数据文件作为文本字符串加载,然后再进行操作。

注意:在访问rdd.toDF()函数之前需要定义SparkSession,否则会抛出异常。

data_path = '../data/ml-100k/'
data = [('apple', 10),('orange', 20),('peach', 100)]

rdd = sc.parallelize(data)
rdd_list = sc.parallelize(range(100))
rdd_movie = sc.textFile(data_path+"u.data")

# to create the Dataframe, we can use rdd.toDF(), but the sparkSession need to be defined beforehand
dfFromRDD1 = rdd.toDF()
dfFromRDD1.show()

+------+---+
|    _1| _2|
+------+---+
| apple| 10|
|orange| 20|
| peach|100|
+------+---+

数据检索

通过使用以下命令,我们能够从 rdd.txt 中的数据中检索信息。

getNumPartitions()

count(), countByKey(), countByValue()

collect(), collectAsMap(), take(num), top(num)

max(), min(), mean(), stdev(), variance(), histogram(num of bin), stats()

rdd_movie.take(3)>> ['196\t242\t3\t881250949', '186\t302\t3\t891717742', '22\t377\t1\t878887116']

数据处理

在 PySpark 中处理数据可能会让你想起 Pandas Dataframe。与 Pandas 类似,PySpark 也提供了分组、聚合、排序和归约的功能。但是功能相似但执行方式不同。

map 和 flatMap

map 和 flatMap 看起来很相似,但它们是不同的,值得特别注意。

map 函数能够保持原始数据形状的列表结构,flatMap将列表结构解包,形成一个大列表数据结构。

在以下单元格中,我们将通过使用适当的分隔符分隔 rdd_movie rdd 来可视化差异。

# map 
# # output: 
# [['196', '242', '3', '881250949'],
#  ['186', '302', '3', '891717742'],
#  ['22', '377', '1', '878887116']]
rdd_movie.map(lambda x: x.split("\t")).take(3)

# flatMap 
# # output
# ['196', '242', '3', '881250949', '186', '302', '3', '891717742']
rdd_movie.flatMap(lambda x: x.split("\t")).take(8)

groupBy 与 groupByKey

groupBy 函数将根据输入中函数的结果对数据进行分组,groupByKey 将根据原始 rdd 的键对数据进行分组。

# 我们将通过每 10个ID对用户分隔来显示 group By 功能
# 例如, [0,10,20 ..] 用户 ID 将在同一组中

groupby_sample = rdd_movie.map(lambda x: x.split("\t")) \
                            .groupBy(lambda x: int(x[0]) % 10).take(1)

# 打印列表中的 4 个数据
for x,y in groupby_sample:
    print(x)
    for val in list(y)[:4]:
        print(val)

        
# 为了说明group by key的功能,我们先把数据转换成(K, V) 对。
# 在此示例中,将评级定义为key并按评级分组
groupbykey_sample = rdd_movie.map(lambda x: x.split("\t")) \
                            .keyBy(lambda x: x[2]) \
                            .groupByKey().take(1)

# 打印列表中的 4 个数据
for x,y in groupbykey_sample:
    print(x)
    for val in list(y)[:4]:
        print(val)

结果如下:

groupby result:userid last digit = 6
['196', '242', '3', '881250949']
['186', '302', '3', '891717742']
['166', '346', '1', '886397596']
['6', '86', '3', '883603013']
groupByKey result:rating = 1
['22', '377', '1', '878887116']
['166', '346', '1', '886397596']
['181', '1081', '1', '878962623']
['276', '796', '1', '874791932']

reduce 与 reduceByKey

首先需要需要注意到reduce 是一个动作,而reduceByKey 是转换。

原文档定义:https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html

reduce:使用函数 func(它接受两个参数并返回一个)聚合数据集的元素。该函数应该是可交换的和关联的,以便它可以被正确地并行计算。

reduceByKey:当在 (K, V) 对的数据集上调用时,返回一个 (K, V) 对的数据集,其中每个键的值使用给定的 reduce 函数 func 聚合,其类型必须是 (V, V) => V。就像在 groupByKey 中一样,reduce 任务的数量可以通过可选的第二个参数进行配置。

############### reduce ###############
rdd.reduce(lambda a,b: a+b)
# output: 
# ('apple', 10, 'orange', 20, 'peach', 100)

############### reduceByKey ###############
user_rating_count_pair = rdd_movie.map(lambda x: x.split("\t")) \
                            .map(lambda x: tuple([x[0], (int(x[2]), 1)]))

user_sum_rating_count_pair = user_rating_count_pair.reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))
user_avg_rating = user_sum_rating_count_pair.map(lambda x: tuple([x[0], x[1][0]/ x[1][1]]))
user_avg_rating.take(3)

# output: 
# >> [('22', 3.3515625), ('244', 3.6512605042016806), ('115', 3.9347826086956523)]

MovieLen 数据集上的示例

我们将使用 PySpark RDD 处理电影的数据,并检索相应的流派。

data_path = '../data/ml-100k/'

# 读取数据
rdd_movie = sc.textFile(data_path+"u.item")
rdd_genre = sc.textFile(data_path+"u.genre")

rdd_movie = rdd_movie.map(lambda x: x.split("|"))
genre_map = rdd_genre.map(lambda x: x.split("|")) \
                    .map(lambda x: tuple([int(x[1]),x[0]])).collectAsMap()

#  u.items
# movie id | movie title | release date | video release date |
# IMDb URL | unknown | Action | Adventure | Animation |
# Children's | Comedy | Crime | Documentary | Drama | Fantasy |
# Film-Noir | Horror | Musical | Mystery | Romance | Sci-Fi |
# Thriller | War | Western |

# 数据转换成 [movieid, movie_name, list of genre]
def process_movie_data(data, genre_map):
    list_genre = []
    for i, ix in enumerate(data):
        # the genre columns start at index 5
        genre_idx = i - 5
        if genre_idx >= 0:
            if ix == '1':
                list_genre.append(genre_map[genre_idx])
    
    res = [data[0], data[1], list_genre]
    return res

rdd_movie_detail = rdd_movie.map(lambda x: process_movie_data(x, genre_map))
rdd_movie_detail.take(2)
标签: Spark Pyspark Python

“PySpark 速查表”的评论:

还没有评论