本文总计 1000 字,预计阅读需要 3 -5分钟
在本文中,我将介绍datacamp的这份Pyspark的速查表
初始化 Spark 环境
每当我们要使用 Spark 时,首先需要初始化 Spark 环境。
SparkContext:Spark Driver将使用 SparkContext 与集群连接和通信,并与资源管理系统协调作业。
SparkSession:从 Spark 2.0 开始,Spark Session 构建了通向不同数据源的网关,例如 SQL 或 Hive。在 Spark 1.x 中,我们需要定义 SQLContext 或 HiveContext 来与相应的源进行通信。
总之,如果我们想使用 Spark Dataframe 或 Dataset,我们需要定义 SparkSession,而SparkContext 则只能处理 RDDs
import numpy as np
import pandas as pd
from pyspark import SparkContext, StorageLevel, SparkConf
from pyspark.sql import SparkSession
conf = SparkConf()
conf.set("spark.driver.memory", "4g")
conf.set("spark.executor.memory", "4g")
sc = SparkContext.getOrCreate(conf)
spark = SparkSession.builder.getOrCreate()
加载数据
可以使用可迭代数据或外部数据源创建 rdd。这里将movielen 数据文件作为文本字符串加载,然后再进行操作。
注意:在访问rdd.toDF()函数之前需要定义SparkSession,否则会抛出异常。
data_path = '../data/ml-100k/'
data = [('apple', 10),('orange', 20),('peach', 100)]
rdd = sc.parallelize(data)
rdd_list = sc.parallelize(range(100))
rdd_movie = sc.textFile(data_path+"u.data")
# to create the Dataframe, we can use rdd.toDF(), but the sparkSession need to be defined beforehand
dfFromRDD1 = rdd.toDF()
dfFromRDD1.show()
+------+---+
| _1| _2|
+------+---+
| apple| 10|
|orange| 20|
| peach|100|
+------+---+
数据检索
通过使用以下命令,我们能够从 rdd.txt 中的数据中检索信息。
getNumPartitions()
count(), countByKey(), countByValue()
collect(), collectAsMap(), take(num), top(num)
max(), min(), mean(), stdev(), variance(), histogram(num of bin), stats()
rdd_movie.take(3)>> ['196\t242\t3\t881250949', '186\t302\t3\t891717742', '22\t377\t1\t878887116']
数据处理
在 PySpark 中处理数据可能会让你想起 Pandas Dataframe。与 Pandas 类似,PySpark 也提供了分组、聚合、排序和归约的功能。但是功能相似但执行方式不同。
map 和 flatMap
map 和 flatMap 看起来很相似,但它们是不同的,值得特别注意。
map 函数能够保持原始数据形状的列表结构,flatMap将列表结构解包,形成一个大列表数据结构。
在以下单元格中,我们将通过使用适当的分隔符分隔 rdd_movie rdd 来可视化差异。
# map
# # output:
# [['196', '242', '3', '881250949'],
# ['186', '302', '3', '891717742'],
# ['22', '377', '1', '878887116']]
rdd_movie.map(lambda x: x.split("\t")).take(3)
# flatMap
# # output
# ['196', '242', '3', '881250949', '186', '302', '3', '891717742']
rdd_movie.flatMap(lambda x: x.split("\t")).take(8)
groupBy 与 groupByKey
groupBy 函数将根据输入中函数的结果对数据进行分组,groupByKey 将根据原始 rdd 的键对数据进行分组。
# 我们将通过每 10个ID对用户分隔来显示 group By 功能
# 例如, [0,10,20 ..] 用户 ID 将在同一组中
groupby_sample = rdd_movie.map(lambda x: x.split("\t")) \
.groupBy(lambda x: int(x[0]) % 10).take(1)
# 打印列表中的 4 个数据
for x,y in groupby_sample:
print(x)
for val in list(y)[:4]:
print(val)
# 为了说明group by key的功能,我们先把数据转换成(K, V) 对。
# 在此示例中,将评级定义为key并按评级分组
groupbykey_sample = rdd_movie.map(lambda x: x.split("\t")) \
.keyBy(lambda x: x[2]) \
.groupByKey().take(1)
# 打印列表中的 4 个数据
for x,y in groupbykey_sample:
print(x)
for val in list(y)[:4]:
print(val)
结果如下:
groupby result:userid last digit = 6
['196', '242', '3', '881250949']
['186', '302', '3', '891717742']
['166', '346', '1', '886397596']
['6', '86', '3', '883603013']
groupByKey result:rating = 1
['22', '377', '1', '878887116']
['166', '346', '1', '886397596']
['181', '1081', '1', '878962623']
['276', '796', '1', '874791932']
reduce 与 reduceByKey
首先需要需要注意到reduce 是一个动作,而reduceByKey 是转换。
原文档定义:https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html
reduce:使用函数 func(它接受两个参数并返回一个)聚合数据集的元素。该函数应该是可交换的和关联的,以便它可以被正确地并行计算。
reduceByKey:当在 (K, V) 对的数据集上调用时,返回一个 (K, V) 对的数据集,其中每个键的值使用给定的 reduce 函数 func 聚合,其类型必须是 (V, V) => V。就像在 groupByKey 中一样,reduce 任务的数量可以通过可选的第二个参数进行配置。
############### reduce ###############
rdd.reduce(lambda a,b: a+b)
# output:
# ('apple', 10, 'orange', 20, 'peach', 100)
############### reduceByKey ###############
user_rating_count_pair = rdd_movie.map(lambda x: x.split("\t")) \
.map(lambda x: tuple([x[0], (int(x[2]), 1)]))
user_sum_rating_count_pair = user_rating_count_pair.reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))
user_avg_rating = user_sum_rating_count_pair.map(lambda x: tuple([x[0], x[1][0]/ x[1][1]]))
user_avg_rating.take(3)
# output:
# >> [('22', 3.3515625), ('244', 3.6512605042016806), ('115', 3.9347826086956523)]
MovieLen 数据集上的示例
我们将使用 PySpark RDD 处理电影的数据,并检索相应的流派。
data_path = '../data/ml-100k/'
# 读取数据
rdd_movie = sc.textFile(data_path+"u.item")
rdd_genre = sc.textFile(data_path+"u.genre")
rdd_movie = rdd_movie.map(lambda x: x.split("|"))
genre_map = rdd_genre.map(lambda x: x.split("|")) \
.map(lambda x: tuple([int(x[1]),x[0]])).collectAsMap()
# u.items
# movie id | movie title | release date | video release date |
# IMDb URL | unknown | Action | Adventure | Animation |
# Children's | Comedy | Crime | Documentary | Drama | Fantasy |
# Film-Noir | Horror | Musical | Mystery | Romance | Sci-Fi |
# Thriller | War | Western |
# 数据转换成 [movieid, movie_name, list of genre]
def process_movie_data(data, genre_map):
list_genre = []
for i, ix in enumerate(data):
# the genre columns start at index 5
genre_idx = i - 5
if genre_idx >= 0:
if ix == '1':
list_genre.append(genre_map[genre_idx])
res = [data[0], data[1], list_genre]
return res
rdd_movie_detail = rdd_movie.map(lambda x: process_movie_data(x, genre_map))
rdd_movie_detail.take(2)