文章目录
- 键值对RDD通常用来做聚合计算
- 一般先通过一些初始ETL(Extract抽取、Transform转化、Load装载)操作来将数据转化为键值对形式
- Java中在创建pair的时候会用到二元组Tuple2
Pair RDD的转化操作
键值对:rdd ={(1,2), (3,4), (3,6)}; other={(3,9)}
函数名目的示例结果reduceByKey(func)合并具有相同键的值rdd.reduceByKey((x,y) => x+y){(1, 2), (3, 10)}groupByKey()对具有相同键的值进行分组rdd.groupByKey(){(1, [2]), (3, [4,6])}combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)使用不同的返回类型合并具有相同键的值示例见后续mapValue(func)对pair RDD中的每个值应用一个函数而不改变键rdd.mapValues(x => x+1){(1,3), (3,5), (3,7)}flatMapValues(func)对pair RDD中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键的键值对记录。通常用于符号化。rdd.flatMapValue(x => (x to 5)){(1, 2), (1,3), (1,4), (1,5), (3,4), (3,5)}keys()返回仅包含键的RDDrdd.keys{1, 3, 3}values()返回仅包含值的RDDrdd.values(){2, 4, 6}sortByKey()返回一个根据键排序的RDDrdd.sortByKey(){(1,2), (3,4),(3,6)}subtractByKey()删掉RDD中键相同的元素rdd.substractByKey(other){(1,2)}join对两个RDD进行内连接rdd.join(other){(3, (4,9)), (3,(6,9))}cogroup将两个RDD中拥有相同键的数据分组到一起rdd.cogroup(other){(1, ([2],[])), (3, ([4,6],[9])))}
聚合操作
- 针对与相同键的rdd的转化操作
类似于MapReduce中的合并器
在为每个键计算全局的总结果之前,先自动在每台机器上进行本地合并,
rdd.mapValues(x =>(x,1)).reduceByKey((x, y)=>(x._1 + y._1, x._2 + y._2))
combineByKey
- 可以让用户返回与输入数据的类型不同的返回值
- 会遍历分区中所有的元素
原理
- 在遇到一个新元素时,combineByKey() 会用createCombiner()的函数来创建那个键对应的累加器的初始值
- 每个分区在遇到第一次出现的键都会创建一个对应的累加器
- 如果是已经遇到的值,会用mergeValue()将该键的累加器对应的当前值,与新的值进行合并
- 由于每个分区独立处理,可以用mergeCombiners()将各个分区累加器里的结果进行合并
并行度调优
- 每个RDD都有固定数目的分区(partition),分区数决定了在RDD上执行操作时的并行度(degree of parallelism)。
- 在执行聚合或分组操作时,可以要求Spark使用给定的分区数。Spark会根据集群的大小推断出一个有意义的默认值,也可以对并行度进行调优来获取更好的性能表现。
data =[("a",3),("b",4),("a",1)]# 默认并行度sc.parallelize(data).reduceByKey(lamda x, y: x + y)# 自定义并行度sc.parallelize(data).reduceByKey(lamda x, y: x + y, 10)
- repartition()函数 把数据通过网络进行混洗,并创建出新的分区集合。对数据进行重新分区是代价相对比较大的操作 优化版:coalesce() 使用rdd.partitions.size/rdd.getNumPartitions来查看RDD的分区数,并确保调用coalesce() 时将RDD合并到比现在的分区数更少的分区中。
数据分组groupByKey
- groupByKey():对于一个由类型K的键和类型V的值组成的RDD,所得到的结果RDD类型是[K, Iterable[V]]
- rdd.reduceByKey(func) 与 rdd.groupByKey().mapValues(value => value.reduce(func))等价。但前者更为高效,因为它避免了为每个键创建存放值的列表的步骤。
连接Join
- leftOuterJoin产生的pairRdd中,每个键对应的值是由一个源RDD中的值与一个包含第二个RDD值的Option/Optional, 因为右表可能没有左表对应值,所以会有不存在,即null的情况。
- Optional类型可以通过isPresent()查看值是否存在,如果存在,则可以调用get()获取对象实例
数据排序sortByKey
sortByKey()
# Python
rdd.sortByKey(ascending=True, numPartitions=None, keyfunc =lambda x:str(x))
// JavaclassIntegerComparatorimplementsComparator<Integer>{publicintcompare(Integer a,Integer b){returnString.valueOf(a).compareTo(String.valueOf(b));}}
rdd.sortByKey(comp);
版权归原作者 搏一搏小单车变大摩托 所有, 如有侵权,请联系我们删除。