文章目录
一、pyspark.sql部分
1.窗口函数
# 数据的分组聚合,找到每个用户最近的3次收藏beat(用window开窗函数)from pyspark.sql.window import Window
import pyspark.sql.functions as F
window_out = Window.partitionBy("user_id") \
.orderBy(F.desc("collect_time"))# user_feed.withColumn("rank", F.rank().over(window_out)).show(truncate = False)# user_feed.withColumn("rank", F.rank().over(window_out)).show(40)
user_feed_test = user_feed.withColumn("rank", F.row_number().over(window_out)) \
.where(F.col('rank')<=3)
user_feed_test.show(30)
结果如下,和mysql的窗口函数类似的,以每个
user_id
分组,然后组内排序,这里我只获取排序后
collect_time
前3的数据,即最近3次的用户收藏数据:
+--------+-------+------------+--------------------+----+| user_id|beat_id|collect_type| collect_time|rank|+--------+-------+------------+--------------------+----+|10065188|827272|4|2021-08-2204:54:...|1||10065188|885812|5|2020-10-2318:53:...|2||10068979|1069390|5|2021-06-2007:44:...|1||10074915|-2|4|2021-11-2713:42:...|1||10074915|1122682|4|2021-09-0714:26:...|2||10075397|947751|4|2022-01-3007:30:...|1||10075397|336641|5|2022-01-3007:23:...|2||10075397|886179|4|2022-01-0510:35:...|3||10104842|886462|1|2021-02-2817:04:...|1||10122654|1531961|4|2022-03-1611:09:...|1||10122654|893655|4|2022-03-1504:32:...|2||10122654|303121|4|2022-03-1405:59:...|3||10134095|0|3|2021-07-2413:02:...|1||10134095|1023250|4|2021-07-2200:31:...|2||10139927|0|5|2020-09-0519:14:...|1||10139927|0|5|2020-09-0317:51:...|2||10245428|889915|5|2020-05-1814:41:...|1||10245428|1073074|5|2020-05-1814:07:...|2|+--------+-------+------------+--------------------+----+
2.更换列名:
如现在有个人员信息表,新加上一列
coun try
字段信息:
# 修改列名from pyspark.sql.functions import col
# df2 = df1.withColumn("avg_resp_rate", col("sum_def_imps")/col("sum_count")).withColumn("avg_ctr", col("sum_clicks")/col("sum_imps"))# another examplefrom pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType, StructField, StringType,IntegerType
data =[('James','','Smith','1991-04-01','M',3000),('Michael','Rose','','2000-05-19','M',4000),('Robert','','Williams','1978-09-05','M',4000),('Maria','Anne','Jones','1967-12-01','F',4000),('Jen','Mary','Brown','1980-02-17','F',-1)]print(data)"""
[('James', '', 'Smith', '1991-04-01', 'M', 3000),
('Michael', 'Rose', '', '2000-05-19', 'M', 4000),
('Robert', '', 'Williams', '1978-09-05', 'M', 4000),
('Maria', 'Anne', 'Jones', '1967-12-01', 'F', 4000),
('Jen', 'Mary', 'Brown', '1980-02-17', 'F', -1)]
"""
先给出对应的字段,创建我们的
DataFrame
格式,然后通过
withColumn
新加上一列,其中
lit("ABC")
是指整列的数据都是对应的
ABC
字符串:
# schema只需要给出列名即可
columns =["firstname","middlename","lastname","dob","gender","salary"]# 增加
df = spark.createDataFrame(data=data, schema = columns)
df.show()# 增加or修改列
df2 = df.withColumn("salary",col("salary").cast("Integer"))
df2.show()
df3 = df.withColumn("salary",col("salary")*100)
df3.show()# lit默认均是USA
df5 = df.withColumn("Coun try", lit("ABC"))
df5.show()
结果如下:
+---------+----------+--------+----------+------+------+|firstname|middlename|lastname| dob|gender|salary|+---------+----------+--------+----------+------+------+| James|| Smith|1991-04-01| M|3000|| Michael| Rose||2000-05-19| M|4000|| Robert||Williams|1978-09-05| M|4000|| Maria| Anne| Jones|1967-12-01| F|4000|| Jen| Mary| Brown|1980-02-17| F|-1|+---------+----------+--------+----------+------+------++---------+----------+--------+----------+------+------+|firstname|middlename|lastname| dob|gender|salary|+---------+----------+--------+----------+------+------+| James|| Smith|1991-04-01| M|3000|| Michael| Rose||2000-05-19| M|4000|| Robert||Williams|1978-09-05| M|4000|| Maria| Anne| Jones|1967-12-01| F|4000|| Jen| Mary| Brown|1980-02-17| F|-1|+---------+----------+--------+----------+------+------++---------+----------+--------+----------+------+------+|firstname|middlename|lastname| dob|gender|salary|+---------+----------+--------+----------+------+------+| James|| Smith|1991-04-01| M|300000|| Michael| Rose||2000-05-19| M|400000|| Robert||Williams|1978-09-05| M|400000|| Maria| Anne| Jones|1967-12-01| F|400000|| Jen| Mary| Brown|1980-02-17| F|-100|+---------+----------+--------+----------+------+------++---------+----------+--------+----------+------+------+-------+|firstname|middlename|lastname| dob|gender|salary|Country|+---------+----------+--------+----------+------+------+-------+| James|| Smith|1991-04-01| M|3000| ABC|| Michael| Rose||2000-05-19| M|4000| ABC|| Robert||Williams|1978-09-05| M|4000| ABC|| Maria| Anne| Jones|1967-12-01| F|4000| ABC|| Jen| Mary| Brown|1980-02-17| F|-1| ABC|+---------+----------+--------+----------+------+------+-------+
3.sql将一个字段根据某个字符拆分成多个字段显示
可以通过
withColumn
和
split
进行分隔,参考上次写的【Pyspark基础】sql获取user最近3次使用的item。
更多参考:
https://www.cnblogs.com/360aq/p/13269417.html
https://www.pythonheidong.com/blog/article/690421/aa556949151c244e81f8/
4.pd和spark的dataframe进行转换:
- 当需要把Spark DataFrame转换成Pandas DataFrame时,可以调用
toPandas()
; - 当需要从Pandas DataFrame创建Spark DataFrame时,可以采用
createDataFrame(pandas_df)
。
更多参考:
厦大数据实验室-借助arrow实现pyspark和pandas之间的数据转换:http://dblab.xmu.edu.cn/blog/2752-2/
5.报错ValueError: Some of types cannot be determined after inferring
是因为有字段类型spark识别不了:
(1)可以提高数据采样率:
sqlContext.createDataFrame(rdd, samplingRatio=0.01)
(2)显式声明要创建的 DataFrame 的数据结构
schema
:
from pyspark.sql.types import*
schema = StructType([
StructField("c1", StringType(),True),
StructField("c2", IntegerType(),True)])
df = sqlContext.createDataFrame(rdd, schema=schema)# 方法二: 使用toDFfrom pyspark.sql.types import*
schema = StructType([
StructField("c1", StringType(),True),
StructField("c2", IntegerType(),True)])
df = rdd.toDF(schema=schema)
参考:https://www.codeleading.com/article/64083243294/
6.df按行打乱
每行生成随机数后排序,然后删除这一随机数的列。
import pyspark.sql.functions as F
# 从rdd生成dataframe
schema = StructType(fields)
df_1 = spark.createDataFrame(rdd, schema)# 乱序: pyspark.sql.functions.rand生成[0.0, 1.0]中double类型的随机数
df_2 = df_1.withColumn('rand', F.rand(seed=42))# 按随机数排序
df_rnd = df_2.orderBy(df_2.rand)# 删除随机数的一列
df = df_rnd.drop(df_rnd.rand)
7.表格的联结
Spark DataFrame理解和使用之两个DataFrame的关联操作
SQL数据库语言基础之SqlServer多表连接查询与INNER JOIN内连接查询
SQL的表格之间的join连接方式——inner join/left join/right join/full join语法及其用法实例
8.dataframe的操作
9.createDataFrame的几种方法
(1)注:structtype的格式(官方文档):
可以通过
StructType
设置
schema
,关于其使用:
# 读取beat数据
schema = StructType([StructField("beatid", StringType(),True)\
,StructField("name", StringType(),True)\
,StructField("language", StringType(),True)])
beats = spark.read.csv("filepath", header=False, schema=schema)# print(beats.show())
beats.show()
(2)从pandas dataframe创建spark dataframe
# 从pandas dataframe创建spark dataframe
colors =['white','green','yellow','red','brown','pink']
color_df=pd.DataFrame(colors,columns=['color'])
color_df['length']=color_df['color'].apply(len)
color_df=spark.createDataFrame(color_df)
color_df.show()
10.pd dataframe与spark dataframe转换,通过sql语句间接对pandas的dataframe进行处理
pandasDF_out.createOrReplaceTempView("pd_data")# %%
spark.sql("select * from pd_data").show()# %%
res = spark.sql("""select * from pd_data
where math>= 90
order by english desc""")
res.show()# %%
output_DF = res.toPandas()print(type(output_DF))
更多参考:https://blog.csdn.net/weixin_46408961/article/details/120407900
11.filter筛选
可以通过filter进行筛选,如找出
category_title
为
00后
的对应行:
# 方法一:filter
category_beat.filter(" category_title = '00后' ").head(5)
12. 新增或者修改spark.sql中dataframe的某列
官方文档pyspark.sql.DataFrame.withColumn描述:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html?highlight=withcolumn#pyspark.sql.DataFrame.withColumn
13.将dataframe保存为csv文件
这里的
repartition(num)
是一个可选项:
# save positive_res_data
file_location ="file:///home/hadoop/development/RecSys/data/"
positive_res_data.repartition(2).write.csv(file_location +'positive_res_data.csv')# 保存成功,有多个文件
也可以使用save:
https://wenku.baidu.com/view/1329194ea75177232f60ddccda38376baf1fe078.html
14. 取出对应表项内容
首先初始表
category_beat
是这样的:
+--------------------------+----------------------------------+| category_title| collect_set(name)|+--------------------------+----------------------------------+|00后|[白月光与朱砂痣, 致姗姗来迟的你...||04年唱的歌|[祝我生日快乐, 旅行的意义, 柠...||10年前没有iPhone但有这些歌|[逆光, 改变自己, 达尔文, 最...|+--------------------------+----------------------------------+
现在要得到第一行的
collect_set
表项内容,即对应的集合:
import random
ans = category_beat.where(col("category_title")=='00后').select(col("collect_set(name)"))# type(ans)
ans.show()# 取出对应行列里的set,并且转为对应的list
lst = ans.take(1)[0][0]# take是取出前1行
lst =list(lst)print(lst,"\n")print("推荐的歌曲:", lst[random.randint(0,len(lst))])"""
+----------------------------------+
| collect_set(name)|
+----------------------------------+
|[白月光与朱砂痣, 致姗姗来迟的你...|
+----------------------------------+
['白月光与朱砂痣', '致姗姗来迟的你', '陨落', '踏山河', '花.间.酒', '山海', '
推荐的歌曲: 山海
"""
15.agg和groupby结合使用
二、Spark Core模块
2.1 udf函数的传参:
https://blog.csdn.net/yeshang_lady/article/details/121570361
2.2 pandas core dataframe
可以使用rdd的api。
2.3 rdd操作
rdd是spark的重点,包括两个算子:
【变换】map、flatMap、groupByKey、reduceByKey等
【动作】collect、count、take、top、first等
2.4 filter操作
rdd = sc.parallelize([1,2,3,4,5,6])
rdd = rdd.filter(lambda x: x %2==0)
rdd.collect()# [2, 4, 6]
2.5 flatMap
对rdd中每个元素按照函数操作,并将结果进行扁平化处理。
rdd = sc.parallelize([3,4,5])
fm = rdd.flatMap(lambda x:range(1, x))
fm.collect()# [1, 2, 1, 2, 3, 1, 2, 3, 4]
三、MLlib模块
MLlib (DataFrame-based)
MLlib (RDD-based)
3.1 kmeans聚类分析
api的使用本身不难,和sklearn的使用差不多:
from pyspark.ml.clustering import KMeans
kMeans = KMeans(k=25, seed=1)
model = kMeans.fit(kmeans_data.select('features'))
model.transform(kmeans_data).show(1000)# 分析模型训练的结果# 训练得到的模型是否有summaryprint(model.hasSummary,"\n")# 获得聚类中心print(model.clusterCenters(),"\n")# 每个簇的大小(含有的数据点数)print(model.summary.clusterSizes)
3.2 gbdt分类和回归
四、推荐算法
4.1 达观数据竞赛:3种改进DL算法
http://www.360doc.com/content/17/0615/21/16619343_663476259.shtml
【安装pyspark】
在Ubuntu上安装pyspark:https://zhuanlan.zhihu.com/p/34635519
apache官网上的安装包:https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
Reference
[1] https://spark.apache.org/docs/latest/api/python/reference/index.html
[2] 算法工程师的数据分析:https://zhuanlan.zhihu.com/p/343375787
[3] 用createDataFrame创建表的几种方法
[4] spark dataframe按行随机打乱
[5] dataframe的常用操作:
0)https://blog.csdn.net/xc_zhou/article/details/118617642
1)https://blog.csdn.net/yeshang_lady/article/details/89528090
2)https://www.jianshu.com/p/acd96549ee15
3)https://www.dandelioncloud.cn/article/details/1441272697576869890
版权归原作者 山顶夕景 所有, 如有侵权,请联系我们删除。