0


PySpark求解连通图问题

前文回顾:

PySpark与GraphFrames的安装与使用
https://xxmdmst.blog.csdn.net/article/details/123009617

networkx快速解决连通图问题
https://xxmdmst.blog.csdn.net/article/details/123012333

前面我讲解了PySpark图计算库的使用以及纯python解决连通图问题的两个示例。这篇文章我们继续对上次的连通图问题改用PySpark实现。

需求1:找社区

刘备和关羽有关系,说明他们是一个社区,刘备和张飞也有关系,那么刘备、关羽、张飞归为一个社区,以此类推。

image-20220218200637173

对于这个连通图问题使用Pyspark如何解决呢?

首先,我们创建spark对象:

  1. from pyspark.sql import SparkSession, Row
  2. from graphframes import GraphFrame
  3. spark = SparkSession \
  4. .builder \
  5. .appName("PySpark") \
  6. .master("local[*]") \
  7. .getOrCreate()
  8. sc = spark.sparkContext
  9. # 设置检查点目录
  10. sc.setCheckpointDir("checkpoint")

然后构建数据:

  1. data =[['刘备','关羽'],['刘备','张飞'],['张飞','诸葛亮'],['曹操','司马懿'],['司马懿','张辽'],['曹操','曹丕']]
  2. data = spark.createDataFrame(data,["人员","相关人员"])
  3. data.show()
  1. +------+--------+
  2. | 人员|相关人员|
  3. +------+--------+
  4. | 刘备| 关羽|
  5. | 刘备| 张飞|
  6. | 张飞| 诸葛亮|
  7. | 曹操| 司马懿|
  8. |司马懿| 张辽|
  9. | 曹操| 曹丕|
  10. +------+--------+

很明显原始数据就是图计算所要求的边数据,只修改一下列名即可:

  1. edges = data.toDF("src","dst")
  2. edges.printSchema()
  1. root
  2. |-- src: string (nullable = true)
  3. |-- dst: string (nullable = true)

下面我们开始构建顶点数据:

  1. vertices =(
  2. edges.rdd.flatMap(lambda x: x).distinct().map(lambda x: Row(x)).toDF(["id"]))
  3. vertices.show()
  1. +------+
  2. | id|
  3. +------+
  4. |诸葛亮|
  5. | 刘备|
  6. | 曹操|
  7. |司马懿|
  8. | 曹丕|
  9. | 关羽|
  10. | 张飞|
  11. | 张辽|
  12. +------+

下面使用spark的图计算 计算连通图:

  1. g = GraphFrame(vertices, edges)
  2. result = g.connectedComponents().orderBy("component")
  3. result.show()
  1. +------+------------+
  2. | id| component|
  3. +------+------------+
  4. |司马懿| 0|
  5. | 张辽| 0|
  6. | 曹丕| 0|
  7. | 曹操| 0|
  8. | 关羽|635655159808|
  9. | 刘备|635655159808|
  10. | 张飞|635655159808|
  11. |诸葛亮|635655159808|
  12. +------+------------+

可以看到结果中已经顺利将一个社区的成员通过一个相同的component标识出来,成功解决需求。

需求2:统一用户识别

abcde这5个字段表示mac地址,ip地址,device_id,imei等唯一标识,tags表示用户的标签。由于某些原因,同一用户的唯一标识字段总是有几个字段存在缺失,现在要求将同一个用户的数据都能识别出来,同时将每个用户的标签进行合并。原始数据和结果模型示例如下:

img

首先,我们构建数据:

  1. df = spark.createDataFrame([['a1',None,'c1',None,None,'tag1'],[None,None,'c1','d1',None,'tag2'],[None,'b1',None,'d1',None,'tag3'],[None,'b1','c1','d1','e1','tag4'],['a2','b2',None,None,None,'tag1'],[None,'b4','c4',None,'e4','tag1'],['a2',None,None,'d2',None,'tag2'],[None,None,'c2','d2',None,'tag3'],[None,'b3',None,None,'e3','tag1'],[None,None,'c3',None,'e3','tag2'],],list("abcde")+["tags"])
  2. df.show()

结果:

  1. +----+----+----+----+----+----+
  2. | a| b| c| d| e|tags|
  3. +----+----+----+----+----+----+
  4. | a1|null| c1|null|null|tag1|
  5. |null|null| c1| d1|null|tag2|
  6. |null| b1|null| d1|null|tag3|
  7. |null| b1| c1| d1| e1|tag4|
  8. | a2| b2|null|null|null|tag1|
  9. |null| b4| c4|null| e4|tag1|
  10. | a2|null|null| d2|null|tag2|
  11. |null|null| c2| d2|null|tag3|
  12. |null| b3|null|null| e3|tag1|
  13. |null|null| c3|null| e3|tag2|
  14. +----+----+----+----+----+----+

接下来的思路依然跟上次一样,首先为每一行数据分配一个唯一id,然后对每个唯一标识的列,根据是否一样构建行与行之间的连接关系,所有的唯一标识列产生的连接关系共同作为图计算的边。

下面使用RDD的zipWithUniqueId方法为每一行产生一个唯一ID,并将这个ID移动到最前(由于这个数据后面可能会多次被频繁使用所以缓存起来):

  1. tmp = df.rdd.zipWithUniqueId().map(lambda x:(x[1], x[0]))
  2. tmp.cache()
  3. tmp.first()
  1. (0, Row(a='a1', b=None, c='c1', d=None, e=None, tags='tag1'))

根据唯一id构建顶点数据:

  1. vertices = tmp.map(lambda x: Row(x[0])).toDF(["id"])
  2. vertices.show()
  1. +---+
  2. | id|
  3. +---+
  4. | 0|
  5. | 1|
  6. | 7|
  7. | 2|
  8. | 8|
  9. | 3|
  10. | 4|
  11. | 10|
  12. | 5|
  13. | 11|
  14. +---+

接下来,构建边数据:

  1. deffunc(p):for k, ids in p:
  2. ids =list(ids)
  3. n =len(ids)if n <=1:continuefor i inrange(n-1):for j inrange(i+1, n):yield(ids[i], ids[j])
  4. edges =[]
  5. keylist =list("abcde")for key in keylist:
  6. data = tmp.mapPartitions(lambda area:[(row[key], i)for i, row in area if row[key]])
  7. edgeRDD = data.groupByKey().mapPartitions(func)
  8. edges.append(edgeRDD)
  9. edgesDF = sc.union(edges).toDF(["src","dst"])
  10. edgesDF.cache()
  11. edgesDF.show()
  1. +---+---+
  2. |src|dst|
  3. +---+---+
  4. | 8| 4|
  5. | 7| 2|
  6. | 0| 1|
  7. | 0| 2|
  8. | 1| 2|
  9. | 4| 10|
  10. | 1| 7|
  11. | 1| 2|
  12. | 7| 2|
  13. | 5| 11|
  14. +---+---+

可以看到所有的行号关系已经被成功获取。

下面使用图计算 计算出属于同一用户的行:

  1. gdf = GraphFrame(vertices, edgesDF)
  2. components = gdf.connectedComponents()
  3. components.show()
  1. +---+---------+
  2. | id|component|
  3. +---+---------+
  4. | 0| 0|
  5. | 1| 0|
  6. | 7| 0|
  7. | 2| 0|
  8. | 8| 4|
  9. | 3| 3|
  10. | 4| 4|
  11. | 10| 4|
  12. | 5| 5|
  13. | 11| 5|
  14. +---+---------+

有了行号和所归属的组唯一标识,我们可以通过表连接获取原始数据的每一行所归属的component:

  1. result = tmp.cogroup(components.rdd) \
  2. .map(lambda pair: pair[1][0].data[0]+ Row(pair[1][1].data[0])) \
  3. .toDF(df.schema.names+["component"])
  4. result.cache()
  5. result.show()
  1. +----+----+----+----+----+----+---------+
  2. | a| b| c| d| e|tags|component|
  3. +----+----+----+----+----+----+---------+
  4. | a1|null| c1|null|null|tag1| 0|
  5. |null|null| c1| d1|null|tag2| 0|
  6. |null| b1| c1| d1| e1|tag4| 0|
  7. |null| b4| c4|null| e4|tag1| 3|
  8. | a2|null|null| d2|null|tag2| 4|
  9. |null| b3|null|null| e3|tag1| 5|
  10. |null| b1|null| d1|null|tag3| 0|
  11. | a2| b2|null|null|null|tag1| 4|
  12. |null|null| c2| d2|null|tag3| 4|
  13. |null|null| c3|null| e3|tag2| 5|
  14. +----+----+----+----+----+----+---------+

可以看到我们已经成功的进行同一用户识别了,剩下的只需要分组并使用pandas的逻辑合并数据:

  1. deffunc(pdf):
  2. row = pdf[keylist].bfill().head(1)
  3. row["tags"]= pdf.tags.str.cat(sep=",")return row
  4. result.groupBy("component").applyInPandas(
  5. func, schema="a string, b string, c string, d string, e string, tags string").show()
  1. +----+---+---+----+----+-------------------+
  2. | a| b| c| d| e| tags|
  3. +----+---+---+----+----+-------------------+
  4. | a1| b1| c1| d1| e1|tag1,tag2,tag4,tag3|
  5. |null| b4| c4|null| e4| tag1|
  6. | a2| b2| c2| d2|null| tag2,tag1,tag3|
  7. |null| b3| c3|null| e3| tag1,tag2|
  8. +----+---+---+----+----+-------------------+

可以看到已经顺利得到需要的结果。

注意:applyInPandas要求返回的结果必须是pandas的datafream对象,所以相对之前的逻辑由.iloc[0]改成了.head(1)

如果你的spark不是3.X版本,没有applyInPandas方法,用原生rdd的方法则会麻烦很多:

  1. deffunc(pair):
  2. component, rows = pair
  3. keyList =list("abcde")
  4. ids ={}for row in rows:for key in keylist:
  5. v =getattr(row, key)if v:
  6. ids[key]= v
  7. ids.setdefault("tags",[]).append(row.tags)
  8. result =[]for key in keylist:
  9. result.append(ids.get(key))
  10. result.append(",".join(ids["tags"]))return result
  11. result2 = result.rdd.groupBy(lambda row: row.component).map(func).toDF(df.schema)
  12. result2.cache()
  13. result2.show()

结果也一样:

  1. +----+---+---+----+----+-------------------+
  2. | a| b| c| d| e| tags|
  3. +----+---+---+----+----+-------------------+
  4. | a1| b1| c1| d1| e1|tag1,tag2,tag4,tag3|
  5. |null| b4| c4|null| e4| tag1|
  6. | a2| b2| c2| d2|null| tag2,tag1,tag3|
  7. |null| b3| c3|null| e3| tag1,tag2|
  8. +----+---+---+----+----+-------------------+

本文转载自: https://blog.csdn.net/as604049322/article/details/123036398
版权归原作者 小小明-代码实体 所有, 如有侵权,请联系我们删除。

“PySpark求解连通图问题”的评论:

还没有评论