0


【Pyspark】常用数据分析基础操作

文章目录

一、pyspark.sql部分

1.窗口函数

# 数据的分组聚合,找到每个用户最近的3次收藏beat(用window开窗函数)from pyspark.sql.window import Window
import pyspark.sql.functions as F

window_out = Window.partitionBy("user_id") \
                   .orderBy(F.desc("collect_time"))# user_feed.withColumn("rank", F.rank().over(window_out)).show(truncate = False)# user_feed.withColumn("rank", F.rank().over(window_out)).show(40)
user_feed_test = user_feed.withColumn("rank", F.row_number().over(window_out)) \
                          .where(F.col('rank')<=3) 
user_feed_test.show(30)

结果如下,和mysql的窗口函数类似的,以每个

user_id

分组,然后组内排序,这里我只获取排序后

collect_time

前3的数据,即最近3次的用户收藏数据:

+--------+-------+------------+--------------------+----+| user_id|beat_id|collect_type|        collect_time|rank|+--------+-------+------------+--------------------+----+|10065188|827272|4|2021-08-2204:54:...|1||10065188|885812|5|2020-10-2318:53:...|2||10068979|1069390|5|2021-06-2007:44:...|1||10074915|-2|4|2021-11-2713:42:...|1||10074915|1122682|4|2021-09-0714:26:...|2||10075397|947751|4|2022-01-3007:30:...|1||10075397|336641|5|2022-01-3007:23:...|2||10075397|886179|4|2022-01-0510:35:...|3||10104842|886462|1|2021-02-2817:04:...|1||10122654|1531961|4|2022-03-1611:09:...|1||10122654|893655|4|2022-03-1504:32:...|2||10122654|303121|4|2022-03-1405:59:...|3||10134095|0|3|2021-07-2413:02:...|1||10134095|1023250|4|2021-07-2200:31:...|2||10139927|0|5|2020-09-0519:14:...|1||10139927|0|5|2020-09-0317:51:...|2||10245428|889915|5|2020-05-1814:41:...|1||10245428|1073074|5|2020-05-1814:07:...|2|+--------+-------+------------+--------------------+----+

2.更换列名:

如现在有个人员信息表,新加上一列

coun try

字段信息:

# 修改列名from pyspark.sql.functions import col
# df2 = df1.withColumn("avg_resp_rate", col("sum_def_imps")/col("sum_count")).withColumn("avg_ctr", col("sum_clicks")/col("sum_imps"))# another examplefrom pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType, StructField, StringType,IntegerType

data =[('James','','Smith','1991-04-01','M',3000),('Michael','Rose','','2000-05-19','M',4000),('Robert','','Williams','1978-09-05','M',4000),('Maria','Anne','Jones','1967-12-01','F',4000),('Jen','Mary','Brown','1980-02-17','F',-1)]print(data)"""
[('James', '', 'Smith', '1991-04-01', 'M', 3000), 
('Michael', 'Rose', '', '2000-05-19', 'M', 4000), 
('Robert', '', 'Williams', '1978-09-05', 'M', 4000), 
('Maria', 'Anne', 'Jones', '1967-12-01', 'F', 4000), 
('Jen', 'Mary', 'Brown', '1980-02-17', 'F', -1)]
"""

先给出对应的字段,创建我们的

DataFrame

格式,然后通过

withColumn

新加上一列,其中

lit("ABC")

是指整列的数据都是对应的

ABC

字符串:

# schema只需要给出列名即可
columns =["firstname","middlename","lastname","dob","gender","salary"]# 增加
df = spark.createDataFrame(data=data, schema = columns)
df.show()# 增加or修改列
df2 = df.withColumn("salary",col("salary").cast("Integer"))
df2.show()

df3 = df.withColumn("salary",col("salary")*100)
df3.show()# lit默认均是USA
df5 = df.withColumn("Coun try", lit("ABC"))
df5.show()

结果如下:

+---------+----------+--------+----------+------+------+|firstname|middlename|lastname|       dob|gender|salary|+---------+----------+--------+----------+------+------+|    James||   Smith|1991-04-01|     M|3000||  Michael|      Rose||2000-05-19|     M|4000||   Robert||Williams|1978-09-05|     M|4000||    Maria|      Anne|   Jones|1967-12-01|     F|4000||      Jen|      Mary|   Brown|1980-02-17|     F|-1|+---------+----------+--------+----------+------+------++---------+----------+--------+----------+------+------+|firstname|middlename|lastname|       dob|gender|salary|+---------+----------+--------+----------+------+------+|    James||   Smith|1991-04-01|     M|3000||  Michael|      Rose||2000-05-19|     M|4000||   Robert||Williams|1978-09-05|     M|4000||    Maria|      Anne|   Jones|1967-12-01|     F|4000||      Jen|      Mary|   Brown|1980-02-17|     F|-1|+---------+----------+--------+----------+------+------++---------+----------+--------+----------+------+------+|firstname|middlename|lastname|       dob|gender|salary|+---------+----------+--------+----------+------+------+|    James||   Smith|1991-04-01|     M|300000||  Michael|      Rose||2000-05-19|     M|400000||   Robert||Williams|1978-09-05|     M|400000||    Maria|      Anne|   Jones|1967-12-01|     F|400000||      Jen|      Mary|   Brown|1980-02-17|     F|-100|+---------+----------+--------+----------+------+------++---------+----------+--------+----------+------+------+-------+|firstname|middlename|lastname|       dob|gender|salary|Country|+---------+----------+--------+----------+------+------+-------+|    James||   Smith|1991-04-01|     M|3000|    ABC||  Michael|      Rose||2000-05-19|     M|4000|    ABC||   Robert||Williams|1978-09-05|     M|4000|    ABC||    Maria|      Anne|   Jones|1967-12-01|     F|4000|    ABC||      Jen|      Mary|   Brown|1980-02-17|     F|-1|    ABC|+---------+----------+--------+----------+------+------+-------+

3.sql将一个字段根据某个字符拆分成多个字段显示

可以通过

withColumn

split

进行分隔,参考上次写的【Pyspark基础】sql获取user最近3次使用的item。

更多参考:
https://www.cnblogs.com/360aq/p/13269417.html
https://www.pythonheidong.com/blog/article/690421/aa556949151c244e81f8/

4.pd和spark的dataframe进行转换:

  • 当需要把Spark DataFrame转换成Pandas DataFrame时,可以调用toPandas()
  • 当需要从Pandas DataFrame创建Spark DataFrame时,可以采用createDataFrame(pandas_df)

更多参考:
厦大数据实验室-借助arrow实现pyspark和pandas之间的数据转换:http://dblab.xmu.edu.cn/blog/2752-2/

5.报错ValueError: Some of types cannot be determined after inferring

是因为有字段类型spark识别不了:

(1)可以提高数据采样率:

sqlContext.createDataFrame(rdd, samplingRatio=0.01)

(2)显式声明要创建的 DataFrame 的数据结构

schema

from pyspark.sql.types import*
schema = StructType([
    StructField("c1", StringType(),True),
    StructField("c2", IntegerType(),True)])
df = sqlContext.createDataFrame(rdd, schema=schema)# 方法二: 使用toDFfrom pyspark.sql.types import*
schema = StructType([
    StructField("c1", StringType(),True),
    StructField("c2", IntegerType(),True)])
df = rdd.toDF(schema=schema)

参考:https://www.codeleading.com/article/64083243294/

6.df按行打乱

每行生成随机数后排序,然后删除这一随机数的列。

import pyspark.sql.functions as F

# 从rdd生成dataframe
schema = StructType(fields)
df_1 = spark.createDataFrame(rdd, schema)# 乱序: pyspark.sql.functions.rand生成[0.0, 1.0]中double类型的随机数
df_2 = df_1.withColumn('rand', F.rand(seed=42))# 按随机数排序
df_rnd = df_2.orderBy(df_2.rand)# 删除随机数的一列
df = df_rnd.drop(df_rnd.rand)

7.表格的联结

Spark DataFrame理解和使用之两个DataFrame的关联操作
SQL数据库语言基础之SqlServer多表连接查询与INNER JOIN内连接查询
SQL的表格之间的join连接方式——inner join/left join/right join/full join语法及其用法实例

8.dataframe的操作

9.createDataFrame的几种方法

(1)注:structtype的格式(官方文档):
可以通过

StructType

设置

schema

,关于其使用:

# 读取beat数据
schema = StructType([StructField("beatid", StringType(),True)\
                   ,StructField("name", StringType(),True)\
                   ,StructField("language", StringType(),True)])
beats = spark.read.csv("filepath", header=False, schema=schema)# print(beats.show())
beats.show()

(2)从pandas dataframe创建spark dataframe

# 从pandas dataframe创建spark dataframe
colors =['white','green','yellow','red','brown','pink']
color_df=pd.DataFrame(colors,columns=['color'])
color_df['length']=color_df['color'].apply(len)

color_df=spark.createDataFrame(color_df)
color_df.show()

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.types.StructType.html?highlight=structtype#pyspark.sql.types.StructType

10.pd dataframe与spark dataframe转换,通过sql语句间接对pandas的dataframe进行处理

pandasDF_out.createOrReplaceTempView("pd_data")# %%
spark.sql("select * from pd_data").show()# %%
res = spark.sql("""select * from pd_data 
                where math>= 90 
                order by english desc""")
res.show()# %%
output_DF = res.toPandas()print(type(output_DF))

更多参考:https://blog.csdn.net/weixin_46408961/article/details/120407900

11.filter筛选

可以通过filter进行筛选,如找出

category_title

00后

的对应行:

# 方法一:filter
category_beat.filter(" category_title = '00后' ").head(5)

https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.filter.html

12. 新增或者修改spark.sql中dataframe的某列

官方文档pyspark.sql.DataFrame.withColumn描述:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html?highlight=withcolumn#pyspark.sql.DataFrame.withColumn

13.将dataframe保存为csv文件

这里的

repartition(num)

是一个可选项:

# save positive_res_data
file_location ="file:///home/hadoop/development/RecSys/data/"
positive_res_data.repartition(2).write.csv(file_location +'positive_res_data.csv')# 保存成功,有多个文件

也可以使用save:
https://wenku.baidu.com/view/1329194ea75177232f60ddccda38376baf1fe078.html

14. 取出对应表项内容

首先初始表

category_beat

是这样的:

+--------------------------+----------------------------------+|            category_title|                 collect_set(name)|+--------------------------+----------------------------------+|00后|[白月光与朱砂痣, 致姗姗来迟的你...||04年唱的歌|[祝我生日快乐, 旅行的意义, 柠...||10年前没有iPhone但有这些歌|[逆光, 改变自己, 达尔文, 最...|+--------------------------+----------------------------------+

现在要得到第一行的

collect_set

表项内容,即对应的集合:

import random

ans = category_beat.where(col("category_title")=='00后').select(col("collect_set(name)"))# type(ans)
ans.show()# 取出对应行列里的set,并且转为对应的list
lst = ans.take(1)[0][0]# take是取出前1行

lst =list(lst)print(lst,"\n")print("推荐的歌曲:", lst[random.randint(0,len(lst))])"""
+----------------------------------+
|                 collect_set(name)|
+----------------------------------+
|[白月光与朱砂痣, 致姗姗来迟的你...|
+----------------------------------+

['白月光与朱砂痣', '致姗姗来迟的你', '陨落', '踏山河', '花.间.酒', '山海', '

推荐的歌曲: 山海
"""

15.agg和groupby结合使用

二、Spark Core模块

2.1 udf函数的传参:

https://blog.csdn.net/yeshang_lady/article/details/121570361

2.2 pandas core dataframe

可以使用rdd的api。

2.3 rdd操作

rdd是spark的重点,包括两个算子:
【变换】map、flatMap、groupByKey、reduceByKey等
【动作】collect、count、take、top、first等

2.4 filter操作

rdd = sc.parallelize([1,2,3,4,5,6])
rdd = rdd.filter(lambda x: x %2==0)
rdd.collect()# [2, 4, 6]

2.5 flatMap

对rdd中每个元素按照函数操作,并将结果进行扁平化处理。

rdd = sc.parallelize([3,4,5])
fm = rdd.flatMap(lambda x:range(1, x))
fm.collect()# [1, 2, 1, 2, 3, 1, 2, 3, 4]

三、MLlib模块

MLlib (DataFrame-based)
MLlib (RDD-based)

3.1 kmeans聚类分析

api的使用本身不难,和sklearn的使用差不多:

from pyspark.ml.clustering import KMeans
kMeans = KMeans(k=25, seed=1)

model = kMeans.fit(kmeans_data.select('features'))
model.transform(kmeans_data).show(1000)# 分析模型训练的结果# 训练得到的模型是否有summaryprint(model.hasSummary,"\n")# 获得聚类中心print(model.clusterCenters(),"\n")# 每个簇的大小(含有的数据点数)print(model.summary.clusterSizes)

3.2 gbdt分类和回归

四、推荐算法

4.1 达观数据竞赛:3种改进DL算法

http://www.360doc.com/content/17/0615/21/16619343_663476259.shtml

【安装pyspark】
在Ubuntu上安装pyspark:https://zhuanlan.zhihu.com/p/34635519
apache官网上的安装包:https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

Reference

[1] https://spark.apache.org/docs/latest/api/python/reference/index.html
[2] 算法工程师的数据分析:https://zhuanlan.zhihu.com/p/343375787
[3] 用createDataFrame创建表的几种方法
[4] spark dataframe按行随机打乱
[5] dataframe的常用操作:
0)https://blog.csdn.net/xc_zhou/article/details/118617642
1)https://blog.csdn.net/yeshang_lady/article/details/89528090
2)https://www.jianshu.com/p/acd96549ee15
3)https://www.dandelioncloud.cn/article/details/1441272697576869890


本文转载自: https://blog.csdn.net/qq_35812205/article/details/124715851
版权归原作者 山顶夕景 所有, 如有侵权,请联系我们删除。

“【Pyspark】常用数据分析基础操作”的评论:

还没有评论