文章目录
一、PySpark读取HDFS数据(spark.read)
1. 读取HDFS数据
集群主节点IP地址为:192.168.126.10。先初始化SparkSession,Spark master的默认端口是7077。再读取HDFS数据,HDFS的端口是9000,在HDFS系统的/data/目录下存放了三个数据集:ratings.csv,movies.csv,tags.csv。
先读取ratings数据集,该数据集包括4个字段:用户ID(userId),电影ID(movieId),电影评分(rating),时间戳(timestamp)。
由上图可知,数据类型默认为string字符型。具体的数据如下图所示。
代码如下:
from pyspark import SparkConf
from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import*
conf = SparkConf().setAppName("PySpark 的数据读写").setMaster('spark://192.168.126.10:7077')
sc = SparkContext.getOrCreate(conf)
spark = SparkSession(sc)# -----------------HDFS的数据读取-------------------
ratings = spark.read.csv('hdfs://192.168.126.10:9000/data/ratings.csv', header=True)# 数据集包含标题行
ratings.describe().show()# 查看数据集的基本数据信息
ratings.printSchema()# 输出数据集的结构
ratings.show(5)
2.数据处理
2.1 转换数据类型
实际上,电影评分(rating)的数据类型应为双精度型(double),时间戳(timestamp)应为日期型(date)。因此需要转换这两个字段的数据类型。
转换后ratings的数据集结构如下所示:
ratings数据集的前5行数据如下:
代码如下:
# ******** 1.转换数据类型 ********
ratings = ratings.withColumn('rating', ratings.rating.cast('double'))# 转换rating的数据类型为double双精度型,重新命名为ratings列
ratings = ratings.withColumn("date", from_unixtime(ratings.timestamp.cast("bigint"),'yyyy-MM-dd'))# 将timestamp转为日期格式,新增一列date
ratings.show(5)
ratings = ratings.withColumn("date", ratings.date.cast("date"))# 将 date列转换为日期类型date,重新命名为date列
ratings = ratings.drop("timestamp")# 删除timestamp列
ratings.printSchema()# 查看DataFrame的结构
ratings.show(5)
2.2 读取数据时自动识别数据类型
设置参数inferSchema=True,可以实现读取数据时,自动识别数据类型。
代码如下:
# ********* 2.读取数据时自动识别数据类型 ************# 数据集包含标题行, inferSchema=True自动识别数据类型
movies = spark.read.csv('hdfs://192.168.126.10:9000/data/movies.csv', header=True, inferSchema=True)
movies.printSchema()
movies.show(5)
ratings_movies = ratings.join(movies, ratings.movieId == movies.movieId,"inner") \
.select("userId", ratings.movieId,'title','date','rating')
ratings_movies.show(5)
moives数据集有3个字段:movieId(电影ID),title(电影名称),genres(电影类型)。读取的movies数据集的结构如下:
movies数据集的前5行数据如下:
2.3 使用udf函数实现复杂的数据处理逻辑
udf(用户自定义函数,User-Defined Function)是一个非常强大的工具,能将Python函数应用到DataFrame的列上。通过使用udf,实现更复杂的数据处理逻辑。
# ******* 3.使用udf函数实现复杂的数据处理逻辑 *********from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf
# udf(用户自定义函数,User-Defined Function)是一个非常强大的工具,它允许你将Python函数应用到DataFrame的列上。# 通过使用udf,你可以扩展PySpark SQL的内置函数库,实现更复杂的数据处理逻辑。defisLike(v):# 定义普通的python函数,判断rating列,如果评分大于4,赋值True表示喜欢这部电影,否则复制Falseif v >4:returnTrueelse:returnFalse
udf_isLike = udf(isLike, BooleanType())# 将创建的python函数注册为udf函数,类型为布尔型
ratings_movies = ratings_movies.withColumn("isLike", udf_isLike(ratings_movies["rating"]))# 将rating列应用udf函数,将其分为True/False,结果存到新isLike列
ratings_movies.show(5)
上面的代码中,#定义了一个普通的python函数,判断rating列,如果评分大于4,赋值True表示喜欢这部电影,否则复制False。然后将创建的python函数注册为udf函数,类型为布尔型。最后在rating列应用udf函数,值为True或False,结果存到新建立的isLike列。结果如下所示。
2.4 聚合函数
在 PySpark 中,普通的 udf 函数并不直接支持作为分组聚合函数使用,与 pandas_udf 不同。Spark 会自动地将 pandas_udf 函数应用到每个分组上。 pandas_udf 函数适合分组后的数据处理。
首先读取数据集tags.csv,该数据集包括4个字段:userID,movieID,tag,timestamp,其中tag为用户对电影的评价。
# ******* 4.聚合函数 *************from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf("string", PandasUDFType.GROUPED_AGG)# 定义一个Pandas UDF,用于在分组聚合时将字符串列表合并为一个以逗号分隔的字符串deffmerge(v):return','.join(v)
tags = spark.read.csv('hdfs://192.168.126.10:9000/data/tags.csv', header=True)# 数据集包含标题行
tags = tags.drop("timestamp")# 删除列timestampprint("删除timestamp后的tags表:")
tags.show(5)# ********groupBy 聚合********
tags_merge = tags.groupBy(["userId","movieId"]).agg(fmerge(tags["tag"]))# 同一个用户对同一个电影的tag放在一起,以逗号隔开print("聚合后:")
tags_merge.show(5)
tags_merge = tags_merge.withColumnRenamed("fmerge(tag)","tags")# 将聚合的列:fmerge(tag), 改为:tagsprint("列改为tags")
tags_merge.show(5)
final_join = ratings_movies.join(tags_merge,(ratings_movies.movieId == tags_merge.movieId)&(
ratings_movies.userId == tags_merge.userId)) \
.select(ratings_movies.userId, ratings_movies.movieId,'title','date','tags','rating','isLike') \
.orderBy(['date'], ascending=False)# 选择
final_join.show(5)
final_join = final_join.filter(ratings.date >'2015-10-25')# 筛选,日期大于2015-10-25
final_join.printSchema()# 显示表的结构
final_join.show(5,False)
上面的代码中,@pandas_udf(“string”, PandasUDFType.GROUPED_AGG) # 定义一个Pandas UDF,用于在分组聚合时将字符串列表合并为一个以逗号分隔的字符串。
根据用户ID和电影ID分组,并应用pandas UDF函数的数据集如下。
由上图可知,经过处理后,数据集有用户ID,电影ID,以及用户对该电影的所有评价(多个评价以逗号分开)。
然后将ratings,moives和tags三表连接,选取部分列,并按照日期升序排序,结果如下:
最后,筛选日期大于2015-10-25的数据。
二、存储数据到HDFS系统
使用了.coalesce(1)来减少分区数到1,这有助于避免生成多个小文件。写入模式mode有覆盖模式overwrite和追加模式append。存储格式有csv、json、parquet等。
coalesce(1) 可能会对大数据集的性能产生负面影响。当以 CSV 或 JSON 格式写入大量数据时,这些格式可能不是最高效的存储选择,因为它们可能不是压缩的,并且可能会消耗更多的磁盘空间。如果正在处理大量数据并且需要高效的存储和检索,需要考虑使用 Parquet 或其他列式存储格式。
# ----------------- 二、存储数据:存放到HDFS系统中------------# 使用了.coalesce(1)来减少分区数到1,这有助于避免生成多个小文件
final_join.coalesce(1).write.format('csv') \
.option('header','true') \
.mode('overwrite') \
.save('hdfs://192.168.126.10:9000/output/movie-out-csv.csv')# csv格式
final_join.coalesce(1).write.format('json') \
.mode('overwrite') \
.save('hdfs://192.168.126.10:9000/output/movie-out-json.json')# json格式
三、存储数据到Linux的MySQL中
1. 写数据到MySQL
先定义连接MySQL数据库的参数,3306为MySQL端口,test为数据库,root为MySQL的用户名,123456为密码,驱动为com.mysql.jdbc.Driver。请注意, .save() 为 第一次建表时使用(假设数据库不存在待创建的表),否则会报错。
# 定义MySQL数据库的连接参数
url ="jdbc:mysql://192.168.126.10:3306/test"
properties ={"user":"root","password":"123456","driver":"com.mysql.jdbc.Driver"}# 写数据到MySQL的test数据库的movie表,无需预先在MySQL创建表。自动在mysql创建movie表
final_join.coalesce(1).write.format('jdbc') \
.option('url', url) \
.option('dbtable','movie') \
.option('user', properties['user']) \
.option('password', properties['password']) \
.option('driver', properties['driver']) \
.option('mode','append') \
# .save() # 第一次建表时使用.save()。append为追加数据模式,overwrite覆盖数据模式print('Spark将数据写入mysql完成。')
2. 封装写数据到MySQL的函数
将写数据到MySQL的代码封装,做成函数方便调用。该函数首先判断MySQL中是否存在待创建的表,存在则先删除表,最后再将数据写入。
# 封装将pyspark的dataframe写入mysql数据库的函数from pyspark.sql import DataFrame
import pymysql
defwrite_to_mysql(df: DataFrame, url:str, properties:dict, table_name:str):"""
将DataFrame写入MySQL数据库的指定表。检查表是否存在,存在则先删除表
参数:
- df: 要写入的DataFrame。
- url: MySQL数据库的JDBC URL。
- properties: 包含数据库连接信息的字典(user, password, driver)。
- table_name: 要写入的MySQL表名。
- mode: 写入模式('append'、'overwrite'等)。
"""
conn = pymysql.connect(host=url.split('/')[2].split(':')[0],
port=int(url.split('/')[2].split(':')[1].split('/')[0]),
user=properties['user'],
password=properties['password'],
database=url.split('/')[-1],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)try:with conn.cursor()as cursor:# 检查表是否存在
cursor.execute(f"SHOW TABLES LIKE '{table_name}'")if cursor.fetchone():# 如果表存在,则删除它
cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
conn.commit()print(f"删除表:{table_name}")finally:
conn.close()# 写数据到MySQL
df.coalesce(1).write.format('jdbc') \
.option('url', url) \
.option('dbtable', table_name) \
.option('user', properties['user']) \
.option('password', properties['password']) \
.option('driver', properties['driver']) \
.option('mode','append') \
.save()print(f'Spark将数据写入MySQL的{table_name}表完成。')# -----使用示例---------
url ="jdbc:mysql://192.168.126.10:3306/test"
properties ={"user":"root","password":"123456","driver":"com.mysql.jdbc.Driver"}
write_to_mysql(final_join, url, properties,'movie')# 调用函数,将movie写入MySQL
四、读取MySQL数据
使用spark.read,以jdbc格式读取MySQL数据库。
# 读取MySQL的test数据库的movie表print('Spark读取mysql数据库的数据\n')
movie_read = spark.read \
.format("jdbc") \
.option("url", url) \
.option("dbtable","movie") \
.option("user", properties["user"]) \
.option("password", properties["password"]) \
.option("driver", properties["driver"]) \
.load()
movie_read.show(5)# 如果用mysql客户端查询bit类型数据,可以采用bin查看:select bin(isLike) from movie limit 10;
版权归原作者 侧耳倾听童话 所有, 如有侵权,请联系我们删除。