0


Spark核心--RDD介绍

一、RDD的介绍

rdd 弹性分布式数据集 是spark框架自己封装的数据类型,用来管理内存数据

数据集:

rdd数据的格式 类似Python中 [] 。 hive中的 该结构[] 叫 数组

rdd提供算子(方法) 方便开发人员进行调用计算数据

在pysaprk中本质是定义一个rdd类型用来管理和计算内存数据

分布式 : rdd可以时使用多台机器的内存资源完成计算

弹性: 可以通过分区将数据分成多份 2 3 4,每份数据对应一个task线程处理

python 也有自己的数据类型 使用的是单机资源管理数据

list 结构: [] 方法 : append pop extend

dict 结构: {k:v} 方法 : items keys values

数据类型可以通过封装类的形式进行定义,所有list,dict在Python中本质是一个类

二、RDD的特点(特性)

  • 将rdd管理的数据分成多份,每份数据会对应一个task,进而提升计算效率

  • 后面的的资源并行度时会讲解如何分区

  • 只读

  • rdd中的分区只能进行读取不能直接修改,可用通过rdd提供的算子进行计算得到一个新的rdd

  • 依赖

  • rdd之间有依赖关系,下一个rdd是依赖上一个rdd的计算结果

  • 缓存

  • 可以将rdd进行缓存,节省计算时间

  • checkpoint机制

  • 保存rdd数据到hdfs,或者其他存储服务上

三、创建RDD数据

通过pyspark进行rdd计算时,需要现将数据转化为rdd

Python数据,文件数据读取后转化为rdd

转化数据的方法在sparkcontext类在中,所有在进行转化先要是生成sparkcontex对象

SparkContext类就算是spark入口类,类中管理了RDD类(生成rdd),SparkConf(配置)类等

3-1 Python转化为rdd

from pyspark import SparkContext

# 使用本地模式
sc = SparkContext()

# Python转化rdd
# parallelize转化rdd数据
# 可以将可迭代对象数据转化为rdd  列表 ,字典,元组,集合,字符串
data_list =[1,2,3]
data_dict = {'a':1,'b':2}  # 字典转化时只会取出key值
rdd = sc.parallelize(data_list)

# 查看结果
# collect 获取rdd中的所有数据
res = rdd.collect()
print(res)

3-2 文件数据转化为rdd

from pyspark import SparkContext

sc = SparkContext()

# 读取hdfs文件数据转为rdd数据,实际工作中都是读hdfs文件数据
rdd = sc.textFile('hdfs://node1:8020/data/data.txt')
# 默认读取hdfs,可以简写
rdd2 = sc.textFile('/data/data.txt')
# 读取目录下所有文件数据
rdd3 = sc.textFile('/data')

# 可以读取本地文件,注意:要用本地模式
rdd4 = sc.textFile('file:///root/data.txt')

# 查看rdd数据
res = rdd.collect()
res2 = rdd2.collect()
res3 = rdd3.collect()
print(res)
print(res2)
print(res3)

res4 = rdd4.collect()
print(res4)

3-3 rdd的分区

一个分区对应一个task执行,可以指定分区,提升计算效率

  • parallelize分区设置

  • glom() 查看分区信息方法

from pyspark import SparkContext

sc = SparkContext()

# parallelize转化数据时指定分区数
# numSlices 指定分区数
rdd_p = sc.parallelize([1, 2, 3, 4, 5], numSlices=3)

# 查看分区信息
# glom配合collect输出结果
# [[1], [2, 3], [4, 5]]
res = rdd_p.glom().collect()
print(res)

res2=rdd_p.collect()
print(res2)
  • textFile分区设置
# textFile转化数据时指定分区数
# minPartitions 指定分区数 该值是最小分区,有可能会多一个分区
rdd_t = sc.textFile('/data/data.txt',minPartitions=3)

# 查看分区信息
# glom配合collect输出结果
res3 = rdd_t.glom().collect()
print(res3)
  • 没有指定,默认分区
# 没有指定使用默认分区
# parallelize没有指定分区,那么分区数和cpu核心一样
rdd_p1 = sc.parallelize([1, 2, 3, 4, 5])
res4 = rdd_p1.glom().collect()
print(res4)

# textFile没有指定分区,方法内默认指定了minPartitions=2
rdd_t1 = sc.textFile('/data/data.txt')
res5 = rdd_t1.glom().collect()
print(res5)
  • 拓展 分区划分过程

  • parallelize 方法

rdd_p = sc.parallelize([1, 2, 3, 4, 5], numSlices=3)
  • 假如列表数据转化rdd分成三个分区,分区编号从0开始 ,通过计算逻辑确定分区的起始和结束位置,进而确定取值范围 - 分区 0 - start (当前分区数0 * 数据元素个数5) / 分区数3 =0- end ((当前分区数0+1)数据元素个数5) / 分区数3 = 1 1.666 除不尽向下取整- 取值范围 [0,1) 左闭右开- 数据就是 [1]- 分区 1 - start (当前分区数1 * 数据元素个数5) / 分区数3 =1 1.666 除不尽向下取整- end ((当前分区数1+1)数据元素个数5) / 分区数3 = 3- 取值范围 [1,3) 左闭右开- 数据就是 [2,3]- 分区 2 - textFile方法- 一个文件有多个block,一个block块对应一个分区数据- 文件只有一个block块- 文件大小/分区数=整数--余数- 整数值作为划分分区的数据大小的值- 余数部分用来确认是否增加一个分区- 71 / 3 = 23--2 - 每个分区的数据大小是23- 2/23=0.08695652173913043 0.08695652173913043*100=8.6% 没有超过10% 不新增一个分区- 分区0 [1,23] 按照行读取文件中第1字符到23个字符 一行的数据不可分割,第一个分区的字符会取到25个字符 - hadoop,hive,spark,flink- 分区1 [25,46] 一行的数据不可分割 - sql,python,hadoop,spark- 分区2 [50,71] - spark,java,scala,java

3-4 小文件数据读取

一个目录下有大量小文件,那么每个小文件数据会对应一个分区数据

小文件过多,就会创建大量分区,每个分区会对应一个task,task计算需要占用资源

  • wholeTextFiles

  • 将小文件数据合并后再进行分区,默认的最小的分区是2

from pyspark import SparkContext

sc = SparkContext()

# 读取hdfs文件数据转为rdd数据,实际工作中都是读hdfs文件数据
# 读取目录下所有文件数据
rdd3 = sc.textFile('/data')

# 查看分区信息
res=rdd3.glom().collect()

print(res)

# 小文件读取方法
rdd4 = sc.wholeTextFiles('/data')

# 查看分区信息
res=rdd4.glom().collect()

print(res)

四、常用RDD算子

4-1 算子(方法)介绍

rdd中封装了各种算子方便进行计算,主要分为两类

  • transformation

  • 转化算子 - 主要是对rdd中的每个元素数据进行转化,转为新的rdd- ['hadoop','flink','spark'] --> [('hadoop',1),('flink',1),('spark',1)]- ['hadoop','flink','spark'] --> ['hadoop+itcast','flink+itcast','spark+itcast']

  • action

  • 执行算子 - 对转化后的进行聚合计算和结果取值输出- 只有在调用执行算子,转化算子业务逻辑才能真正被执行

在rdd中没有字典类型的数据,采用 [(k,v),(k2,v2)]来表示kv数据

4-2 常用transformation算子

  • map 主要的场景是构造k-v形式 (x,1)

  • rdd.map(lambda 参数:参数计算)

  • 参数接受每个元素数据

  • 返回的结果没有要求

from pyspark import SparkContext

sc = SparkContext()

#  rdd转化时都是采用列表
data_dict = {'a':1,'b':2}
data_list=[]
for k,v in data_dict.items():
    data_list.append((k,v))
print(data_list)

rdd = sc.parallelize(data_list)

res =rdd.collect()
print(res)

rdd2 = sc.parallelize([1,2,3,4])
rdd3 = sc.parallelize(['a','b','c','d'])

# 使用map方法转化 rdd数据
# 读取rdd中的每个元素数据进行转化,转化方式需要编写lambda表达式
# lambda x:x+2 表达式需要接受一个参数,该参数获取每个rdd的元素数据
# 转化后返回一个新的rdd
rdd2_map = rdd2.map(lambda x:x+2)

# 转化时要注意接受的参数类型
rdd3_map = rdd3.map(lambda x:x+'+'+'itcast')

# map更多的场景是转化k-v数
rdd4_map = rdd3.map(lambda x:(x,1))

# 查看rdd数据
res = rdd2_map.collect()
print(res)
res3 = rdd3_map.collect()
print(res3)
res4 = rdd4_map.collect()
print(res4)
  • flatMap

  • 处理的是二维嵌套列表数据 [[1,2,3],[4,5,6],[7,8,9]]

  • rdd.flatMap(lambda 参数:[参数计算])

  • 返回结果是列表

from pyspark import SparkContext

sc = SparkContext()

# flatMap 也是数据转化,处理的是二维嵌套列表
rdd1 = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize([[1,2,3],[4,5,6],[7,8,9]])
#lambda x:x  定义一个参数,接受rdd中的每个元素,然后对每个元素进行处理
# rdd不是二维嵌套列表,flatMap处理错误
# rdd = rdd1.flatMap(lambda x:x)
# print(rdd.collect())
rdd_flatMap= rdd2.flatMap(lambda x:x) # 将二维数据转化一维数据
print(rdd_flatMap.collect())

rdd_map = rdd_flatMap.map(lambda x:x*2)
print(rdd_map.collect())

rdd_map2 = rdd2.map(lambda x:x[1])
print(rdd_map2.collect())
  • fliter

  • rdd.filter(lambda 参数:参数条件过滤)

  • 条件过滤的书写和Python中if判断一样

from pyspark import SparkContext

sc = SparkContext()

# 生成rdd数据
rdd = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize([1,2,3,4,None,None])

# fliter过滤数据
# lambda x:x%2==0  接受一个参数,获取rdd中的每个元素数据,进行x%2==0判断,符合条件的数据会返回到新的rdd
rdd_filter = rdd.filter(lambda x:x%2==0)
print(rdd_filter.collect())

rdd_filter2 = rdd2.filter(lambda x:x is not None)
print(rdd_filter2.collect())
  • distinct 去重

  • 不需要lambda rdd.distinct

from pyspark import SparkContext

sc = SparkContext()

# 生成rdd数据
rdd = sc.parallelize([1, 2, 3, 4, 2, 1])
rdd_kv = sc.parallelize([('a', 1), ('b', 2), ('a', 1),('a', 2)])

# 去重 返回一个新的rdd
rdd_distinct = rdd.distinct()

print(rdd_distinct.collect())

# kv类型的去重
rdd_distinct2 = rdd_kv.distinct()

print(rdd_distinct2.collect())
  • groupBy 分区

  • rdd.groupBy(lambda 参数:根据参数编写分区条件)

  • mapValues(list)

from pyspark import SparkContext

sc = SparkContext()

# 生成rdd数据
rdd = sc.parallelize([1,2,3,4,5,6])
# 分组  lambda x:x%2  接受一个参数,对rdd中的每个元素进行处理,计算规则  x%2
# 对每个元素进行取余,余数相同的数据放在一起
rdd_groupBy = rdd.groupBy(lambda x:x%2)

print(rdd_groupBy.collect())

# mapValues 获取(k,v)结构中的value值
# lambda x:list(x) 接受一个参数,获取vlaue值部分,然后进行处理
rdd_mapValues=rdd_groupBy.mapValues(lambda x:list(x))
print(rdd_mapValues.collect())

rdd_kv = sc.parallelize([('a',1),('b',2)])
rdd_mapValues1=rdd_kv.mapValues(lambda x:x+1)
print(rdd_mapValues1.collect())
  • k-v数据 [(k,v),(k1,v1)]

  • groupByKey() - rdd.groupByKey()

  • reduceByKey() - rdd.reduceByKey(lambda 参数1,参数2:对两个参数计算)

  • sortByKey() - rdd.sortByKey()

from pyspark import SparkContext

sc = SparkContext()

rdd = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('a', 2)])

# 按照k值分组 不需要lambda  返回一个新的rdd
rdd_groupByKey = rdd.groupByKey()

print(rdd_groupByKey.collect())

rdd_mapValues=rdd_groupByKey.mapValues(lambda x:list(x))
print(rdd_mapValues.collect())

# 分组聚合  相同k值数据放在一起然后聚合
# lambda x,y:x+y
rdd_reduceByKey = rdd.reduceByKey(lambda x,y:x+y)
print(rdd_reduceByKey.collect())

# 按照k值排序  不需要lamda  返回一个新的rdd
rdd_sortByKey= rdd.sortByKey()
print(rdd_sortByKey.collect())  # 默认是升序

rdd_sortByKey2= rdd.sortByKey(ascending=False)  # 降序
print(rdd_sortByKey2.collect())
  • sortBy() 排序

  • rdd.sortBy(lambda x:x,ascending=False)

from pyspark import SparkContext

sc = SparkContext()

rdd1 = sc.parallelize([1, 4, 3, 2])

# 排序
# lambda x:x 接受一个参数,对rdd中元素数据排序
rdd_sortBy = rdd1.sortBy(lambda x: x)
rdd_sortBy2 = rdd1.sortBy(lambda x: x, ascending=False)

print(rdd_sortBy.collect())
print(rdd_sortBy2.collect())

rdd2 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('a', 2)])
# k-v排序  x接受每个元素,元素是一个元祖,通过下标确认按照哪个进行排序
rdd_sortBy3 = rdd2.sortBy(lambda x: x[0])
rdd_sortBy4 = rdd2.sortBy(lambda x: x[1])
print(rdd_sortBy3.collect())
print(rdd_sortBy4.collect())

2-4 常用action算子

  • collect() 取出所有值

  • rdd.collect() - reduce() 非k-v类型数据累加

  • rdd.reduce(lambda 参数1,参数2:两个参数计算) - count() 统计rdd元素个数

  • rdd.count() - take() 取出指定数量值

  • rdd.take(数量)

from pyspark import SparkContext

sc = SparkContext()

# 生成rdd数据
rdd = sc.parallelize([1, 2, 3, 4])

#action算子
# 获取rdd中所有数据
res = rdd.collect()
print(res)

# 统计rdd的元素个数
res = rdd.count()
print(res)

# 指定获取数据个数  类似mysql中limit
res =rdd.take(2)
print(res)

# 非kv数据聚合
res =rdd.reduce(lambda x,y:x+y)
print(res)

五、案例

词频统计

from pyspark import SparkContext

sc = SparkContext()

# 读取hdfs文件数据转为rdd数据,实际工作中都是读hdfs文件数据
rdd = sc.textFile('hdfs://node1:8020/data/data.txt')

print(rdd.collect())

rdd_map=rdd.map(lambda x:x.split(','))
print(rdd_map.collect())

rdd_flatMap = rdd_map.flatMap(lambda x:x)
print(rdd_flatMap.collect())

rdd_map2=rdd_flatMap.map(lambda x:(x,1))
print(rdd_map2.collect())

rdd_reduceByKey=rdd_map2.reduceByKey(lambda x,y:x+y)
print(rdd_reduceByKey.collect())

rdd_soutBy = rdd_reduceByKey.sortBy(lambda x:x[1],ascending=False)
print(rdd_soutBy.collect())

本文转载自: https://blog.csdn.net/qq_71257020/article/details/135480558
版权归原作者 陆水A 所有, 如有侵权,请联系我们删除。

“Spark核心--RDD介绍”的评论:

还没有评论