一、RDD的介绍
rdd 弹性分布式数据集 是spark框架自己封装的数据类型,用来管理内存数据
数据集:
rdd数据的格式 类似Python中 [] 。 hive中的 该结构[] 叫 数组
rdd提供算子(方法) 方便开发人员进行调用计算数据
在pysaprk中本质是定义一个rdd类型用来管理和计算内存数据
分布式 : rdd可以时使用多台机器的内存资源完成计算
弹性: 可以通过分区将数据分成多份 2 3 4,每份数据对应一个task线程处理
python 也有自己的数据类型 使用的是单机资源管理数据
list 结构: [] 方法 : append pop extend
dict 结构: {k:v} 方法 : items keys values
数据类型可以通过封装类的形式进行定义,所有list,dict在Python中本质是一个类
二、RDD的特点(特性)
将rdd管理的数据分成多份,每份数据会对应一个task,进而提升计算效率
后面的的资源并行度时会讲解如何分区
只读
rdd中的分区只能进行读取不能直接修改,可用通过rdd提供的算子进行计算得到一个新的rdd
依赖
rdd之间有依赖关系,下一个rdd是依赖上一个rdd的计算结果
缓存
可以将rdd进行缓存,节省计算时间
checkpoint机制
保存rdd数据到hdfs,或者其他存储服务上
三、创建RDD数据
通过pyspark进行rdd计算时,需要现将数据转化为rdd
Python数据,文件数据读取后转化为rdd
转化数据的方法在sparkcontext类在中,所有在进行转化先要是生成sparkcontex对象
SparkContext类就算是spark入口类,类中管理了RDD类(生成rdd),SparkConf(配置)类等
3-1 Python转化为rdd
from pyspark import SparkContext
# 使用本地模式
sc = SparkContext()
# Python转化rdd
# parallelize转化rdd数据
# 可以将可迭代对象数据转化为rdd 列表 ,字典,元组,集合,字符串
data_list =[1,2,3]
data_dict = {'a':1,'b':2} # 字典转化时只会取出key值
rdd = sc.parallelize(data_list)
# 查看结果
# collect 获取rdd中的所有数据
res = rdd.collect()
print(res)
3-2 文件数据转化为rdd
from pyspark import SparkContext
sc = SparkContext()
# 读取hdfs文件数据转为rdd数据,实际工作中都是读hdfs文件数据
rdd = sc.textFile('hdfs://node1:8020/data/data.txt')
# 默认读取hdfs,可以简写
rdd2 = sc.textFile('/data/data.txt')
# 读取目录下所有文件数据
rdd3 = sc.textFile('/data')
# 可以读取本地文件,注意:要用本地模式
rdd4 = sc.textFile('file:///root/data.txt')
# 查看rdd数据
res = rdd.collect()
res2 = rdd2.collect()
res3 = rdd3.collect()
print(res)
print(res2)
print(res3)
res4 = rdd4.collect()
print(res4)
3-3 rdd的分区
一个分区对应一个task执行,可以指定分区,提升计算效率
parallelize分区设置
glom() 查看分区信息方法
from pyspark import SparkContext
sc = SparkContext()
# parallelize转化数据时指定分区数
# numSlices 指定分区数
rdd_p = sc.parallelize([1, 2, 3, 4, 5], numSlices=3)
# 查看分区信息
# glom配合collect输出结果
# [[1], [2, 3], [4, 5]]
res = rdd_p.glom().collect()
print(res)
res2=rdd_p.collect()
print(res2)
- textFile分区设置
# textFile转化数据时指定分区数
# minPartitions 指定分区数 该值是最小分区,有可能会多一个分区
rdd_t = sc.textFile('/data/data.txt',minPartitions=3)
# 查看分区信息
# glom配合collect输出结果
res3 = rdd_t.glom().collect()
print(res3)
- 没有指定,默认分区
# 没有指定使用默认分区
# parallelize没有指定分区,那么分区数和cpu核心一样
rdd_p1 = sc.parallelize([1, 2, 3, 4, 5])
res4 = rdd_p1.glom().collect()
print(res4)
# textFile没有指定分区,方法内默认指定了minPartitions=2
rdd_t1 = sc.textFile('/data/data.txt')
res5 = rdd_t1.glom().collect()
print(res5)
拓展 分区划分过程
parallelize 方法
rdd_p = sc.parallelize([1, 2, 3, 4, 5], numSlices=3)
- 假如列表数据转化rdd分成三个分区,分区编号从0开始 ,通过计算逻辑确定分区的起始和结束位置,进而确定取值范围 - 分区 0 - start (当前分区数0 * 数据元素个数5) / 分区数3 =0- end ((当前分区数0+1)数据元素个数5) / 分区数3 = 1 1.666 除不尽向下取整- 取值范围 [0,1) 左闭右开- 数据就是 [1]- 分区 1 - start (当前分区数1 * 数据元素个数5) / 分区数3 =1 1.666 除不尽向下取整- end ((当前分区数1+1)数据元素个数5) / 分区数3 = 3- 取值范围 [1,3) 左闭右开- 数据就是 [2,3]- 分区 2 - textFile方法- 一个文件有多个block,一个block块对应一个分区数据- 文件只有一个block块- 文件大小/分区数=整数--余数- 整数值作为划分分区的数据大小的值- 余数部分用来确认是否增加一个分区- 71 / 3 = 23--2 - 每个分区的数据大小是23- 2/23=0.08695652173913043 0.08695652173913043*100=8.6% 没有超过10% 不新增一个分区- 分区0 [1,23] 按照行读取文件中第1字符到23个字符 一行的数据不可分割,第一个分区的字符会取到25个字符 - hadoop,hive,spark,flink- 分区1 [25,46] 一行的数据不可分割 - sql,python,hadoop,spark- 分区2 [50,71] - spark,java,scala,java
3-4 小文件数据读取
一个目录下有大量小文件,那么每个小文件数据会对应一个分区数据
小文件过多,就会创建大量分区,每个分区会对应一个task,task计算需要占用资源
wholeTextFiles
将小文件数据合并后再进行分区,默认的最小的分区是2
from pyspark import SparkContext
sc = SparkContext()
# 读取hdfs文件数据转为rdd数据,实际工作中都是读hdfs文件数据
# 读取目录下所有文件数据
rdd3 = sc.textFile('/data')
# 查看分区信息
res=rdd3.glom().collect()
print(res)
# 小文件读取方法
rdd4 = sc.wholeTextFiles('/data')
# 查看分区信息
res=rdd4.glom().collect()
print(res)
四、常用RDD算子
4-1 算子(方法)介绍
rdd中封装了各种算子方便进行计算,主要分为两类
transformation
转化算子 - 主要是对rdd中的每个元素数据进行转化,转为新的rdd- ['hadoop','flink','spark'] --> [('hadoop',1),('flink',1),('spark',1)]- ['hadoop','flink','spark'] --> ['hadoop+itcast','flink+itcast','spark+itcast']
action
执行算子 - 对转化后的进行聚合计算和结果取值输出- 只有在调用执行算子,转化算子业务逻辑才能真正被执行
在rdd中没有字典类型的数据,采用 [(k,v),(k2,v2)]来表示kv数据
4-2 常用transformation算子
map 主要的场景是构造k-v形式 (x,1)
rdd.map(lambda 参数:参数计算)
参数接受每个元素数据
返回的结果没有要求
from pyspark import SparkContext
sc = SparkContext()
# rdd转化时都是采用列表
data_dict = {'a':1,'b':2}
data_list=[]
for k,v in data_dict.items():
data_list.append((k,v))
print(data_list)
rdd = sc.parallelize(data_list)
res =rdd.collect()
print(res)
rdd2 = sc.parallelize([1,2,3,4])
rdd3 = sc.parallelize(['a','b','c','d'])
# 使用map方法转化 rdd数据
# 读取rdd中的每个元素数据进行转化,转化方式需要编写lambda表达式
# lambda x:x+2 表达式需要接受一个参数,该参数获取每个rdd的元素数据
# 转化后返回一个新的rdd
rdd2_map = rdd2.map(lambda x:x+2)
# 转化时要注意接受的参数类型
rdd3_map = rdd3.map(lambda x:x+'+'+'itcast')
# map更多的场景是转化k-v数
rdd4_map = rdd3.map(lambda x:(x,1))
# 查看rdd数据
res = rdd2_map.collect()
print(res)
res3 = rdd3_map.collect()
print(res3)
res4 = rdd4_map.collect()
print(res4)
flatMap
处理的是二维嵌套列表数据 [[1,2,3],[4,5,6],[7,8,9]]
rdd.flatMap(lambda 参数:[参数计算])
返回结果是列表
from pyspark import SparkContext
sc = SparkContext()
# flatMap 也是数据转化,处理的是二维嵌套列表
rdd1 = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize([[1,2,3],[4,5,6],[7,8,9]])
#lambda x:x 定义一个参数,接受rdd中的每个元素,然后对每个元素进行处理
# rdd不是二维嵌套列表,flatMap处理错误
# rdd = rdd1.flatMap(lambda x:x)
# print(rdd.collect())
rdd_flatMap= rdd2.flatMap(lambda x:x) # 将二维数据转化一维数据
print(rdd_flatMap.collect())
rdd_map = rdd_flatMap.map(lambda x:x*2)
print(rdd_map.collect())
rdd_map2 = rdd2.map(lambda x:x[1])
print(rdd_map2.collect())
fliter
rdd.filter(lambda 参数:参数条件过滤)
条件过滤的书写和Python中if判断一样
from pyspark import SparkContext
sc = SparkContext()
# 生成rdd数据
rdd = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize([1,2,3,4,None,None])
# fliter过滤数据
# lambda x:x%2==0 接受一个参数,获取rdd中的每个元素数据,进行x%2==0判断,符合条件的数据会返回到新的rdd
rdd_filter = rdd.filter(lambda x:x%2==0)
print(rdd_filter.collect())
rdd_filter2 = rdd2.filter(lambda x:x is not None)
print(rdd_filter2.collect())
distinct 去重
不需要lambda rdd.distinct
from pyspark import SparkContext
sc = SparkContext()
# 生成rdd数据
rdd = sc.parallelize([1, 2, 3, 4, 2, 1])
rdd_kv = sc.parallelize([('a', 1), ('b', 2), ('a', 1),('a', 2)])
# 去重 返回一个新的rdd
rdd_distinct = rdd.distinct()
print(rdd_distinct.collect())
# kv类型的去重
rdd_distinct2 = rdd_kv.distinct()
print(rdd_distinct2.collect())
groupBy 分区
rdd.groupBy(lambda 参数:根据参数编写分区条件)
mapValues(list)
from pyspark import SparkContext
sc = SparkContext()
# 生成rdd数据
rdd = sc.parallelize([1,2,3,4,5,6])
# 分组 lambda x:x%2 接受一个参数,对rdd中的每个元素进行处理,计算规则 x%2
# 对每个元素进行取余,余数相同的数据放在一起
rdd_groupBy = rdd.groupBy(lambda x:x%2)
print(rdd_groupBy.collect())
# mapValues 获取(k,v)结构中的value值
# lambda x:list(x) 接受一个参数,获取vlaue值部分,然后进行处理
rdd_mapValues=rdd_groupBy.mapValues(lambda x:list(x))
print(rdd_mapValues.collect())
rdd_kv = sc.parallelize([('a',1),('b',2)])
rdd_mapValues1=rdd_kv.mapValues(lambda x:x+1)
print(rdd_mapValues1.collect())
k-v数据 [(k,v),(k1,v1)]
groupByKey() - rdd.groupByKey()
reduceByKey() - rdd.reduceByKey(lambda 参数1,参数2:对两个参数计算)
sortByKey() - rdd.sortByKey()
from pyspark import SparkContext
sc = SparkContext()
rdd = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('a', 2)])
# 按照k值分组 不需要lambda 返回一个新的rdd
rdd_groupByKey = rdd.groupByKey()
print(rdd_groupByKey.collect())
rdd_mapValues=rdd_groupByKey.mapValues(lambda x:list(x))
print(rdd_mapValues.collect())
# 分组聚合 相同k值数据放在一起然后聚合
# lambda x,y:x+y
rdd_reduceByKey = rdd.reduceByKey(lambda x,y:x+y)
print(rdd_reduceByKey.collect())
# 按照k值排序 不需要lamda 返回一个新的rdd
rdd_sortByKey= rdd.sortByKey()
print(rdd_sortByKey.collect()) # 默认是升序
rdd_sortByKey2= rdd.sortByKey(ascending=False) # 降序
print(rdd_sortByKey2.collect())
sortBy() 排序
rdd.sortBy(lambda x:x,ascending=False)
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([1, 4, 3, 2])
# 排序
# lambda x:x 接受一个参数,对rdd中元素数据排序
rdd_sortBy = rdd1.sortBy(lambda x: x)
rdd_sortBy2 = rdd1.sortBy(lambda x: x, ascending=False)
print(rdd_sortBy.collect())
print(rdd_sortBy2.collect())
rdd2 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('a', 2)])
# k-v排序 x接受每个元素,元素是一个元祖,通过下标确认按照哪个进行排序
rdd_sortBy3 = rdd2.sortBy(lambda x: x[0])
rdd_sortBy4 = rdd2.sortBy(lambda x: x[1])
print(rdd_sortBy3.collect())
print(rdd_sortBy4.collect())
2-4 常用action算子
collect() 取出所有值
rdd.collect() - reduce() 非k-v类型数据累加
rdd.reduce(lambda 参数1,参数2:两个参数计算) - count() 统计rdd元素个数
rdd.count() - take() 取出指定数量值
rdd.take(数量)
from pyspark import SparkContext
sc = SparkContext()
# 生成rdd数据
rdd = sc.parallelize([1, 2, 3, 4])
#action算子
# 获取rdd中所有数据
res = rdd.collect()
print(res)
# 统计rdd的元素个数
res = rdd.count()
print(res)
# 指定获取数据个数 类似mysql中limit
res =rdd.take(2)
print(res)
# 非kv数据聚合
res =rdd.reduce(lambda x,y:x+y)
print(res)
五、案例
词频统计
from pyspark import SparkContext
sc = SparkContext()
# 读取hdfs文件数据转为rdd数据,实际工作中都是读hdfs文件数据
rdd = sc.textFile('hdfs://node1:8020/data/data.txt')
print(rdd.collect())
rdd_map=rdd.map(lambda x:x.split(','))
print(rdd_map.collect())
rdd_flatMap = rdd_map.flatMap(lambda x:x)
print(rdd_flatMap.collect())
rdd_map2=rdd_flatMap.map(lambda x:(x,1))
print(rdd_map2.collect())
rdd_reduceByKey=rdd_map2.reduceByKey(lambda x,y:x+y)
print(rdd_reduceByKey.collect())
rdd_soutBy = rdd_reduceByKey.sortBy(lambda x:x[1],ascending=False)
print(rdd_soutBy.collect())
版权归原作者 陆水A 所有, 如有侵权,请联系我们删除。