0


Spark的reduceByKey方法使用

一、需求

在ODPS上我们有如下数据:
idcategory_idattr_idattr_nameattr_value205348100000462最优粘度["0W-40"]205348100000461基础油类型["全合成"]205348100000463级别["BMW Longlife 01"]
我们希望得到的结果如下:

(205348, 10000046, "基础油类型:全合成\n最优粘度:0W-40\n级别:BMW Longlife 01\n")

需求解读:

需要将(id, category_id)作为key,然后将(attr_id, attr_name, attr_value)进行reduce操作,在reduce之后的数据中对attr_id进行排序,再将attr_name和attr_value合并在一起。

二、reduce操作之字符串方式

这个是最简单的方式,大致思路如下:

首先,将(id, category_id)作为key。

然后,将attr_id、attr_name、attr_value合并成一个字符串attr_info:attr_id + "#" + attr_name + "#" + attr_value,然后attr_info再通过"&"进行合并。

示例代码如下:

xx.map{case(id, category_id, attr_id, attr_name, attr_value) => ((id, category_id), attr_id + "#" + attr_name + "#" + attr_value)}
    .reduceByKey(_ + "&" + _, 100)

然后在接下来的流程中首先split("#")得到不同的attr信息,再通过split("&")得到不同的attr的列信息。这就要求attr_id,attr_name,attr_value中不能包含"#"和"&"字符串。

所以这种方式有缺陷,就是当attr_id,attr_name,attr_value包含了"#"和"&"字符串时需要先replace一下,这样就改变了原数据的值。

三、reduce操作之列表方式

这种方式相对复杂一点,需要对输入数据进行预处理,但是逻辑清晰。

输入数据中(id, category_id)是key保持不变,(item_id, item_name, item_value)是一组tuple。

reduce操作会在同一个partition中,不同的partition之间进行数据合并,这要求数据的输入、输出类型保持不变。

我们的初步想法:将item_id, item_name, item_value分别放到3个列表中,合并时就是列表之间的合并,合并完毕后使用时只需要遍历列表即可。

因为reduce操作的输入、输出类型不能变化,所以先放item_id, item_name, item_value初始化为一个列表,然后再进行列表之间的合并。

示例代码如下:

xx.map{case(id, category_id, attr_id, attr_name, attr_value) => 
      val itemIdList = new ArrayList[Long]()
      itemIdList.add(attr_id)
      val itemNameList = new ArrayList[String]()
      itemNameList.add(attr_name)
      val itemValueList = new ArrayList[String]()
      itemValueList.add(attr_value)
      ((id, category_id), (itemIdList, itemNameList, itemValueList))

}.reduceByKey((x, y) => {
      val itemIdList = new ArrayList[Long]()
      for(i <- 0 until x._1.size()){
        itemIdList.add(x._1.get(i))
      }
      for(i <- 0 until y._1.size()){
        itemIdList.add(y._1.get(i))
      }

      val itemNameList = new ArrayList[String]()
      for(i <- 0 until x._2.size()){
        itemNameList.add(x._2.get(i))
      }
      for(i <- 0 until y._2.size()){
        itemNameList.add(y._2.get(i))
      }

      val itemValueList = new ArrayList[String]()
      for(i <- 0 until x._3.size()){
        itemValueList.add(x._3.get(i))
      }
      for(i <- 0 until y._3.size()){
        itemValueList.add(y._3.get(i))
      }

      (itemIdList, itemNameList, itemValueList)
}, 100)

四、reduce之partition属性

首先提一下Shuffle过程,它的本意是洗牌、混乱的意思,类似于java中的Colletions.shuffle(List)方法,它会随机地打乱参数list里地元素顺序。MapReduce的Shuffle过程大致可以理解成:数据从map task输出到reduce task输入的这段过程。

而partition过程:分割map每个节点的结果,按照key分别映射给不同的reduce,这个是可以自定义的。

通过设置reduce中的numPartitions值,会在reduce操作之后进行repartition,避免数据不均衡堆在一个partition中。

五、reduceByKey和groupByKey的区别

从 shuffle 的角度: reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。

从功能的角度: reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey 。reduceByKey的分区内和分区间的计算规则是一样的

标签: spark java ajax

本文转载自: https://blog.csdn.net/benben044/article/details/136387492
版权归原作者 数据猴赛雷 所有, 如有侵权,请联系我们删除。

“Spark的reduceByKey方法使用”的评论:

还没有评论