于是我驻足,享受无法复刻的一些瞬间
** —— 24.11.9**
一、Filter方法
功能
过滤想要的数据进行保留
语法
基于filter中我们传入的函数,决定rdd对象中哪个保留哪个丢弃
代码
from pyspark import SparkConf,SparkContext
# 设置spark中的python解释器对象
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
# 对RDD的数据进行过滤,保留奇数,去除偶数
# 方法1:
def Retain(data):
if data % 2 == 1:
return True
else:
return False
# 对RDD数据进行过滤,留下奇数
rdd1 = rdd.filter(Retain)
print(rdd1.collect())
# 方法2:
rdd2 = rdd.filter(lambda num:num % 2 == 1)
print(rdd2.collect())
总结
filter算子
接受一个处理函数,可用lambda匿名函数快速编写
函数对RDD数据逐个处理,得到True的保留到返回值的RDD中
二、distinct方法
功能
对RDD数据进行去重,返回新RDD
语法
rdd.distinct() # 无需传参
代码
from pyspark import SparkConf,SparkContext
# 设置spark中的python解释器对象
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([1,3,3,4,4,4,7,8,9,9])
rdd = rdd.distinct()
print(rdd.collect())
总结
distinct算子
完成对Rdd内数据的去重操作
三、SortBy方法
功能
对RDD数据进行排序,基于指定的排序依据
语法
rdd.sortBy()
rdd.sortBy(func, ascending = False, numPartitions = 1)
# func:(T) - > U: 告知按照rdd中的哪个数据进行排序,比如 lambda x:x[1] 表示按照rdd中的第二列元素进行排序
# ascending: True升序 False 降序
# numPartitions: 用多少分区排序
代码
from pyspark import SparkConf,SparkContext
# 设置spark中的python解释器对象
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 读取数据文件
rdd = sc.textFile("D:/2LFE\Desktop\WordCount.txt")
# 取出全部单词
word_rdd = rdd.flatMap(lambda x:x.split(" "))
print(word_rdd.collect())
# 将所有单词都转换成二元元组,单词为key,value设置为1
word_with_one_rdd = word_rdd.map(lambda word:(word,1))
# 分组并求和
result_rdd = word_with_one_rdd.reduceByKey(lambda a,b:a+b)
# 对结果进行排序
result_rdd = result_rdd.sortBy(lambda x:x[1],ascending = False,numPartitions = 1)
# 打印并输出结果
print(result_rdd.collect())
总结
sortBy算子
接收一个处理函数,可用lambda快速编写
函数表示用来决定排序的依据
可以控制升序或降序
全局排序需要设置分区数为1
四、数据计算练习
需求:
复制以上内容到文件中,使用Spark读取文件进行计算:
① 各个城市销售额排名,从大到小
② 全部城市,有哪些商品类别在售卖
③ 北京市有哪些商品类别在售卖
解答
from pyspark import SparkConf,SparkContext
import json
# 设置spark中的python解释器对象
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 读取文件得到RDD
file_rdd = sc.textFile("E:\python.learning\pyspark\sortBy.txt")
# 取出一个个JSON字符串
json_str_rdd = file_rdd.flatMap(lambda x:x.split("|"))
# 将一个JSON字符串转换为字典 json模块
dict_rdd = json_str_rdd.map(lambda x:json.loads(x))
# 取出城市和销售额数据:(城市,销售额)
city_with_money_rdd = dict_rdd.map(lambda x:(x['areaName'],int(x['money'])))
# 按销售额对结果进行聚合然后根据销售额降序排序
city_result_rdd = city_with_money_rdd.reduceByKey(lambda x,y:x+y)
res1 = city_result_rdd.sortBy(lambda x:x[1],ascending = False,numPartitions = 1)
print("需求1结果:" , res1.collect())
# 需求2 对全部商品进行去重
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("需求2结果:",category_rdd.collect())
# 需求3 过滤北京市的数据
BJ_data_rdd = dict_rdd.filter(lambda x:x['areaName'] == '北京')
print("需求3结果:",BJ_data_rdd.collect())
# 需求4 对北京市的商品类别进行商品类别去重
res2 = BJ_data_rdd.map(lambda x:x['category']).distinct()
print("需求4结果:",res2.collect())
总结
去重函数:
在 PySpark 框架下,
distinct
函数用于返回一个新的 RDD,其中包含原始 RDD 中的不同元素。
过滤函数:
filter
函数用于从弹性分布式数据集(RDD)中筛选出满足特定条件的元素,返回一个新的 RDD 只包含满足条件的元素。
转换函数:
在 PySpark 中,
map
函数是对弹性分布式数据集(RDD)进行转换操作的一种重要方法。
map
函数对 RDD 中的每个元素应用一个函数,返回一个新的 RDD,其中包含应用函数后的结果。
排序函数:
sortBy
函数用于对RDD 中的元素进行排序,它接受一个函数或者一个字段名作为参数,根据这个参数来确定排序的依据。
版权归原作者 L_cl 所有, 如有侵权,请联系我们删除。