Spark学习笔记
前言:今天是温习 Spark 的第 4 天啦!主要梳理了 SparkSQL 工作中常用的操作大全,以及演示了几个企业级案例,希望对大家有帮助!
Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!
喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"
文章目录
(本节的所有数据集放在我的资源下载区哦,感兴趣的小伙伴可以自行下载:
最全面的SparkSQL系列案例数据集
)
二、SparkSQL基本入门
1.sparkSQL简介
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。
sparkSQL的四个特性
- 1-易整合:将sql查询与spark程序无缝混合,可以使用java、scala、python、R等语言的API操作。
- 2-统一的数据访问:以相同的方式连接到任何数据源。
- 3-兼容hive: 支持Hive HQL的语法,兼容hive(元数据库、SQL语法、UDF、序列化、反序列化机制)。
- 4-标准的数据连接:可以使用行业标准的JDBC或ODBC连接。
2. 创建sparksession
- 1-spark.read.text():读取文件
- 2-fileDF.printSchema():字段的名称和类型
- 3-fileDF.show(truncate=False):显示的字符串将不会被截断
- 4-fileDF.rdd.collect():以rdd的形式出现
_01SparkSessionCreate.py
# -*- coding: utf-8 -*-
# Program function:学会创建SparkSession
from pyspark.sql import SparkSession
from pyspark import SparkConf
if __name__ == '__main__':
# TODO 1-引入SparkSession的环境
conf = SparkConf().setAppName("sparksession").setMaster("local[*]")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# TODO 2-利用Spark环境变量生成SparkContext
sc = spark.sparkContext
# TODO 3-读取一个文件
fileDF = spark.read.text("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/words.txt")
# TODO 4-查看数据有多少行
print("fileDF counts value is:{}".format(fileDF.count())) # fileDF counts value is:2
fileDF.printSchema() # 字段的名称和字段的类型
# root
# |-- value: string (nullable = true)
fileDF.show(truncate=False)
# +------------------------+
# |value |
# +------------------------+
# |hello you Spark Flink |
# |hello me hello she Spark|
# +------------------------+
print(fileDF.rdd.collect())
#[Row(value='hello you Spark Flink'), Row(value='hello me hello she Spark')]
# TODO 5-查看数据有多少行
spark.stop()
fileDF counts value is:2
root
|-- value: string (nullable = true)
+------------------------+
|value |
+------------------------+
|hello you Spark Flink |
|hello me hello she Spark|
+------------------------+
[Row(value='hello you Spark Flink'), Row(value='hello me hello she Spark')]
3. RDD 转化为 DataFrame 的两种方式
(1) toDF()
1-无参数:
- alias(‘age’):取别名操作
- Row(name=zhangsan, age=18):行操作
- dsl操作:df.select(df语句).show()
- sql操作:spark.sql(sql语句).show(),记得提前创建临时视图:df.createOrReplaceTempView(“t_table”)
_01rddToDataFrameWay1.py
# -*- coding: utf-8 -*-
# Program function:第一种方式处理rdd转化为df
'''
1-准备好上下文环境SparkSession
2-读取数据,sc.textFile()
3-使用Row对象对每行数据进行操作 Row(name=zhangsan,age=18)
4-使用spark.createDataFrame(schema)创建DataFrame
5-直接使用printSchema查看Scheme
6-使用show展示数据
'''
from pyspark.sql import SparkSession
from pyspark.sql.types import Row
if __name__ == '__main__':
# 1 - 准备好上下文环境SparkSession
spark = SparkSession.builder.master("local[*]").appName("testPi").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# 2 - 读取数据,sc.textFile()
rdd_file = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/people.txt")
file_map_rdd = rdd_file.map(lambda record: record.split(","))
# print(file__map.collect())
# 3 - 使用Row对象对每行数据进行操作Row(name=zhangsan, age=18)
df = file_map_rdd.map(lambda line: Row(name=line[0], age=int(line[1]))).toDF()
# 4 - SparkSQL提供了两种风格查询数据
# 4-1第一种风格DSL 领域查询语言df.select.filter
print("===================df.select DSL-=========================")
df.select("name").show()
df.select(["name", "age"]).show()
df.select(df.name, (df.age + 10).alias('age')).show()
# 4-2第二种风格SQL 写SQL实现
print("===================spark.sql- SQL=========================")
df.createOrReplaceTempView("t_table")
spark.sql("select * from t_table").show()
spark.sql("select name from t_table").show()
spark.sql("select name,age from t_table").show()
spark.sql("select name,age + 10 from t_table").show()
# 5 - 直接使用printSchema查看Scheme
df.printSchema()
spark.sql("desc t_table").show()
# 6 - 使用show展示数据
df.show()
sc.stop()
===================df.select DSL-=========================
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
+-------+---+
| name|age|
+-------+---+
|Michael| 39|
| Andy| 40|
| Justin| 29|
+-------+---+
===================spark.sql- SQL=========================
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
+-------+--------------------------+
| name|(age + CAST(10 AS BIGINT))|
+-------+--------------------------+
|Michael| 39|
| Andy| 40|
| Justin| 29|
+-------+--------------------------+
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
| name| string| null|
| age| bigint| null|
+--------+---------+-------+
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
2-有参数:
_05rddToDataFrameWay4.py
# -*- coding: utf-8 -*-
# Program function:第一种方式处理rdd转化为df
'''
1-准备好上下文环境SparkSession
2-读取数据,sc.textFile()
3-使用StructType和StructFiled创建Schema
4-使用spark.createDataFrame(schema)创建DataFrame
5-直接使用printSchema查看Scheme
6-使用show展示数据
'''
from pyspark.sql import SparkSession
from pyspark.sql.types import *
if __name__ == '__main__':
# 1 - 准备好上下文环境SparkSession
spark = SparkSession.builder.master("local[*]").appName("testPi").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# 2 - 读取数据,sc.textFile()
rdd_file = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/people.txt")
file_map_rdd = rdd_file.map(lambda record: record.split(","))
#List[str]
df = file_map_rdd.toDF(["name", "age"])
df.printSchema()
df.show()
spark.stop()
(2)spark.createDataFrame(schema)
- scheme_people :进行Row行操作了
_02rddToDataFrameWay1.py
# -*- coding: utf-8 -*-
# Program function:第一种方式处理rdd转化为df
'''
1-准备好上下文环境SparkSession
2-读取数据,sc.textFile()
3-使用Row对象对每行数据进行操作 Row(name=zhangsan,age=18)
4-使用spark.createDataFrame(schema)创建DataFrame
5-直接使用printSchema查看Scheme
6-使用show展示数据
'''
from numpy.distutils.system_info import dfftw_info
from pyspark.sql import SparkSession
from pyspark.sql.types import Row
if __name__ == '__main__':
# 1 - 准备好上下文环境SparkSession
spark = SparkSession.builder.master("local[*]").appName("testPi").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# 2 - 读取数据,sc.textFile()
rdd_file = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/people.txt")
file_map_rdd = rdd_file.map(lambda record: record.split(","))
# print(file__map.collect())
# 3 - 使用Row对象对每行数据进行操作Row(name=zhangsan, age=18)
scheme_people = file_map_rdd.map(lambda line: Row(name=line[0], age=int(line[1])))
# 4 - 使用spark.createDataFrame(schema)创建DataFrame
df = spark.createDataFrame(scheme_people)
# 5 - 直接使用printSchema查看Scheme
df.printSchema()
# 6 - 使用show展示数据
df.show()
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
(3) spark.createDataFrame(rdd,schema)
1-不同字段类型时:
- peoplerdd:只是取出数据
- schema:使用StructType和StructFiled创建Schema
_03rddToDataFrameWay3.py
# -*- coding: utf-8 -*-
# Program function:第一种方式处理rdd转化为df
'''
1-准备好上下文环境SparkSession
2-读取数据,sc.textFile()
3-使用StructType和StructFiled创建Schema
4-使用spark.createDataFrame(schema)创建DataFrame
5-直接使用printSchema查看Scheme
6-使用show展示数据
'''
from pyspark.sql import SparkSession
from pyspark.sql.types import *
if __name__ == '__main__':
# 1 - 准备好上下文环境SparkSession
spark = SparkSession.builder.master("local[*]").appName("testPi").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# 2 - 读取数据,sc.textFile()
rdd_file = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/people.txt")
file_map_rdd = rdd_file.map(lambda record: record.split(","))
# print(file__map.collect())
# 3 - 使用Row对象对每行数据进行操作Row(name=zhangsan, age=18)
peoplerdd = file_map_rdd.map(lambda line: (line[0], int(line[1].strip())))
# 使用StructType和StructFiled创建Schema
schema = StructType([StructField("name", StringType(), True), StructField("age", IntegerType(), True)])
# 它的默认值为 True,即表示该字段可以为空。
# 4 - 使用spark.createDataFrame(schema)创建DataFrame
df = spark.createDataFrame(peoplerdd, schema)
# 5 - 直接使用printSchema查看Scheme
df.printSchema()
# root
# | -- name: string(nullable=true)
# | -- age: integer(nullable=true)
# 6 - 使用show展示数据
df.show()
spark.stop()
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
1-相同字段类型时:
_04rddToDataFrameWay31.py
# -*- coding: utf-8 -*-
# Program function:第一种方式处理rdd转化为df
'''
1-准备好上下文环境SparkSession
2-读取数据,sc.textFile()
3-使用StructType和StructFiled创建Schema
4-使用spark.createDataFrame(schema)创建DataFrame
5-直接使用printSchema查看Scheme
6-使用show展示数据
'''
from pyspark.sql import SparkSession
from pyspark.sql.types import *
if __name__ == '__main__':
# 1 - 准备好上下文环境SparkSession
spark = SparkSession.builder.master("local[*]").appName("testPi").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# 2 - 读取数据,sc.textFile()
rdd_file = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/people.txt")
file_map_rdd = rdd_file.map(lambda record: record.split(","))
# print(file__map.collect())
# 3 - 使用Row对象对每行数据进行操作Row(name=zhangsan, age=18)
peoplerdd = file_map_rdd.map(lambda line: (line[0], int(line[1].strip())))
# 使用StructType和StructFiled创建Schema
schemaName = "name,age"
split_ = [StructField(scheme, StringType(), True) for scheme in schemaName.split(",")]
schema = StructType(split_)
# 4 - 使用spark.createDataFrame(schema)创建DataFrame
df = spark.createDataFrame(peoplerdd, schema)
# 5 - 直接使用printSchema查看Scheme
df.printSchema()
# root
# | -- name: string(nullable=true)
# | -- age: integer(nullable=true)
# 6 - 使用show展示数据
df.show()
spark.stop()
4. Pandas to DF
- spark.createDataFrame(pandas_df)
pandasToDF.py
# -*- coding: utf-8 -*-
# Program function:pandas转化为DF
import pandas as pd
from pyspark.sql import SparkSession
from datetime import datetime, date
if __name__ == '__main__':
spark = SparkSession.builder.appName("readData").master("local[*]").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
pandas_df = pd.DataFrame({
'a': [1, 2, 3],
'b': [2., 3., 4.],
'c': ['string1', 'string2', 'string3'],
'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
print(pandas_df)
# a b c d e
# 0 1 2.0 string1 2000-01-01 2000-01-01 12:00:00
# 1 2 3.0 string2 2000-02-01 2000-01-02 12:00:00
# 2 3 4.0 string3 2000-03-01 2000-01-03 12:00:00
print(pandas_df.shape) # (3, 5)
# print(pandas_df.values)
# from an :class:`RDD`, a list or a :class:`pandas.DataFrame`.
df_pandas = spark.createDataFrame(pandas_df)
df_pandas.printSchema()
# root
# | -- a: long(nullable=true)
# | -- b: double(nullable=true)
# | -- c: string(nullable=true)
# | -- d: date(nullable=true)
# | -- e: timestamp(nullable=true)
df_pandas.show()
print(df_pandas.rdd.collect())
spark.stop()
5. 常见spark读数据的三种方式
(1) spark.read.text()
(2) spark.read.format()
- 1-读csv格式数据 - option(“header”,True):保留文件头- option(“sep”,“;”):分隔符是;- option(“inferSchema”,True):自动推断每列的数据类型- load(路径地址)
- 2-读取json数据
- 3-读取Parquet数据
_01_readCsvData.py
# -*- coding: utf-8 -*-
# Program function:读取csv数据
# csv 以逗号作为分隔符的文本
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession.builder.appName("readData").master("local[*]").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# 读取csv数据
csv_data=spark.read.format("csv")\
.option("header",True)\
.option("sep",";")\
.option("inferSchema",True)\
.load("/export/data/pyspark_workspace/PySpark-SparkSQL_3.1.2/data/sql/people.csv")
csv_data.printSchema()
csv_data.show()
print(type(csv_data))#<class 'pyspark.sql.dataframe.DataFrame'>
# 读取Json数据
json__load = spark.read.format("json").load("/export/data/pyspark_workspace/PySpark-SparkSQL_3.1.2/data/sql/people.json")
json__load.printSchema()
json__load.show()
# 读取Parquet数据
parquet__load = spark.read.format("parquet").load("/export/data/pyspark_workspace/PySpark-SparkSQL_3.1.2/data/sql/users.parquet")
parquet__load.printSchema()
parquet__load.show()
(3) DSL 和 SQL
- 1-DSL - 查看数据:show,printSchema- 查看字段数据:col,column,pandas语法- 统计:groupby,count,orderby,agg- 重命名:withColumnRenamed
- 2-SQL - 创建临时视图:createOrReplaceTempView- spark.sql(SQL语法)
# -*- coding: utf-8 -*-
# Program function:DSL & SQL
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 1-准备环境变量
spark = SparkSession.builder.appName("readData").master("local[*]").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# 2-读取数据
dataDF = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", True) \
.option("sep", ";") \
.load("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/people.csv")
# 3-查看数据
dataDF.show(2, truncate=False)
dataDF.printSchema()
# 4-执行DSL的操作
from pyspark.sql.functions import col, column
# 查看name字段的数据
dataDF.select("name").show()
dataDF.select(col("name")).show()
# dataDF.select(column("name")).show()
dataDF.select(dataDF.name).show()
dataDF.select(dataDF["name"]).show()
# 查看name,age字段的数据
dataDF.select(["name", "age"]).show()
dataDF.select(col("name"), col("age")).show()
dataDF.select(dataDF["name"], col("age")).show()
dataDF.select(dataDF.name, col("age")).show()
# 过滤personDF的年龄大于21岁的信息
dataDF.filter("age >30").show()
dataDF.filter(dataDF["age"] > 30).show()
dataDF.filter(col("age") > 30).show()
# groupBy统计
dataDF.groupby("age").count().orderBy("count").withColumnRenamed("count", "countBig").show()
from pyspark.sql import functions as F
dataDF.groupby("age").agg(F.count(dataDF.age)).show()
dataDF.groupby("age").agg({"age": "count"}).show()
# SQL
dataDF.createOrReplaceTempView("t_table")
spark.sql("select name from t_table").show()
spark.sql("select name,age from t_table").printSchema()
spark.sql("select Name,age from t_table").printSchema()
# root
# | -- Name: string(nullable=true)
# | -- age: integer(nullable=true)
spark.sql("select name ,age from t_table where age>30").show()
spark.sql("select name ,age from t_table order by age limit 2").show()
spark.stop()
6. WordCount 的两种做法
(1) DSL 做法
- 1-explode爆炸函数:扁平化操作
- 2-withColumn:有则改之,无则添之
# -*- coding: utf-8 -*-
# Program function:DSL wordcount
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 1-准备环境变量
spark = SparkSession.builder.appName("readData").master("local[*]").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# 2-读取数据
dataDF = spark.read.text("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/words.txt")
# 3-查看数据
dataDF.printSchema()
# root
# | -- value: string(nullable=true)
# 4-wordcount
from pyspark.sql import functions as F
# 这里使用explode爆炸函数将文本数据扁平化处理
# withColumn,如果有相同列调换掉,否则增加列
dataExplodeDF = dataDF.withColumn("words", F.explode(F.split(F.col("value"), " ")))
dataExplodeDF.show(truncate=False)
# +------------------------+-----+
# |value |words|
# +------------------------+-----+
# |hello you Spark Flink |hello|
# |hello you Spark Flink |you |
# |hello you Spark Flink |Spark|
# |hello you Spark Flink |Flink|
# |hello me hello she Spark|hello|
# |hello me hello she Spark|me |
# |hello me hello she Spark|hello|
# |hello me hello she Spark|she |
# |hello me hello she Spark|Spark|
# +------------------------+-----+
dataExplodeDF.groupby("words").count().orderBy("count", ascending=False).show()
# +-----+-----+
# | words | count |
# +-----+-----+
# | hello | 3 |
# | Spark | 2 |
# | me | 1 |
# | Flink | 1 |
# | you | 1 |
# | she | 1 |
# +-----+-----+
(2) SQL 做法
- 1-split(value,’ '):切割函数
- 2-explode:爆炸函数可以结合切割函数
# -*- coding: utf-8 -*-
# Program function:DSL wordcount
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 1-准备环境变量
spark = SparkSession.builder.appName("readData").master("local[*]").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# 2-读取数据
dataDF = spark.read.text("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/words.txt")
# 3-查看数据
dataDF.printSchema()
# root
# | -- value: string(nullable=true)
# 4-wordcount
dataDF.createOrReplaceTempView("t_table")
spark.sql("select split(value,' ') from t_table").show(truncate=False)
# +------------------------------+
# |split(value, , -1) |
# +------------------------------+
# |[hello, you, Spark, Flink] |
# |[hello, me, hello, she, Spark]|
# +------------------------------+
spark.sql("select explode(split(value,' ')) as words from t_table").show(truncate=False)
# +-----+
# |words|
# +-----+
# |hello|
# |you |
# |Spark|
# |Flink|
# |hello|
# |me |
# |hello|
# |she |
# |Spark|
# +-----+
spark.sql("""
select words,count(1) as count from
(select explode(split(value,' ')) as words from t_table) w
group by words
order by count desc
""").show(truncate=False)
# +-----+-----+
# | words | count |
# +-----+-----+
# | hello | 3 |
# | Spark | 2 |
# | you | 1 |
# | me | 1 |
# | Flink | 1 |
# | she | 1 |
# +-----+-----+
7. 电影案例解析
(1) 预先清洗数据
- 1-读取后转化为to_df - 过滤:每一行不为空,且切割后不能缺字段- 映射:正式切割- 映射:逐一对应字段- toDF:分配字段
# -*- coding: utf-8 -*-
# Program function:对电影数据集的数据接入演示
'''
* 之前做数据的接入:
* 1-查看数据的格式,根据几种方式读取即可
* 2-准备环境
* 3-读取数据文件、
* 4-进行转化DF
* 5-后续统计
'''
from pyspark.sql import SparkSession
from pyspark import SparkConf
import re
if __name__ == '__main__':
# TODO *之前做数据的接入:
# TODO *1 - 查看数据的格式,根据几种方式读取即可--结论:通过rdd方式读取并过滤的操作,之后sc.textFile
# TODO *2 - 准备环境
conf = SparkConf().setAppName("testIris").setMaster("local[*]")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# TODO *3 - 读取数据文件
movies_rdd = sc.textFile("/export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/ml-100k/u.data")
print("movies count is:" + str(movies_rdd.count()))
# TODO 数据清洗
moviesDF = movies_rdd \
.filter(lambda line: (len(line.strip()) > 0 and (len(re.split("\s+", line.strip())) == 4))) \
.map(lambda line: re.split("\s+", line.strip())) \
.map(lambda line: (int(line[0]), int(line[1]), float(line[2]), int(line[3]))) \
.toDF(["userid", "itemid", "rating", "timestamp"])
moviesDF.show()
moviesDF.printSchema()
# root
# |-- userid: long (nullable = true)
# |-- itemid: long (nullable = true)
# |-- rating: double (nullable = true)
# |-- timestamp: long (nullable = true)
# TODO *4 - 进行转化DF
# TODO *5 - 后续统计
- 2-读取后自定义schema - StructType,StructField定义字段- createDataFrame(data,schema)
# -*- coding: utf-8 -*-
# Program function:对电影数据集的数据接入演示
'''
* 之前做数据的接入:
* 1-查看数据的格式,根据几种方式读取即可
* 2-准备环境
* 3-读取数据文件、
* 4-进行转化DF
* 5-后续统计
'''
from pyspark.sql import SparkSession
from pyspark import SparkConf
import re
if __name__ == '__main__':
# TODO *之前做数据的接入:
# TODO *1 - 查看数据的格式,根据几种方式读取即可--结论:通过rdd方式读取并过滤的操作,之后sc.textFile
# TODO *2 - 准备环境
conf = SparkConf().setAppName("testIris").setMaster("local[*]")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# TODO *3 - 读取数据文件
movies_rdd = sc.textFile("/export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/ml-100k/u.data")
print("movies count is:" + str(movies_rdd.count()))
# TODO 数据清洗
moviesData = movies_rdd \
.filter(lambda line: (len(line.strip()) > 0 and (len(re.split("\s+", line.strip())) == 4))) \
.map(lambda line: re.split("\s+", line.strip())) \
.map(lambda line: (int(line[0]), int(line[1]), float(line[2]), int(line[3])))
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
movies_schehma = StructType([StructField("userid", StringType(), True),
StructField("itemid", IntegerType(), False),
StructField("rating", FloatType(), False),
StructField("timestamp", IntegerType(), False),
])
moviesDF = spark.createDataFrame(moviesData, movies_schehma)
moviesDF.show()
moviesDF.printSchema()
# root
# |-- userid: long (nullable = true)
# |-- itemid: long (nullable = true)
# |-- rating: double (nullable = true)
# |-- timestamp: long (nullable = true)
# TODO *4 - 进行转化DF
# TODO *5 - 后续统计
(2) 实战流程
- 1-设置shuffle做法:这种设置的好处是可以减少 Shuffle 过程中的数据传输和磁盘 IO,从而提高性能。然而,设置过小的分区数可能会导致数据不均匀的问题,从而影响性能。因此,在设置 shuffle 分区数时,需要综合考虑数据量、集群资源和任务需求等因素。
- 2-写入数据存储 - coalesce:减少分区操作- write:写入操作- csv:数据格式地址注意是文件夹,且不能私自创建- mysql:一个format和mode,五个option,一个save
# -*- coding: utf-8 -*-
# Program function:对电影数据集进行按照需求统计分析
'''
* 1-准备SparkSession
* 2-读取ratings.dat数据文件
* 3-转化为DF
* 4-使用DSL实战
* 5-使用SQL实战
* 6-将结果输出
* 7-spark.stop
'''
import time
from pyspark.sql import SparkSession
from pyspark.sql.types import Row
import os
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3//bin/python"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
if __name__ == '__main__':
# *1 - 准备SparkSession
spark = SparkSession \
.builder \
.appName("testRatings") \
.master("local[3]") \
.config("spark.sql.shuffle.partitions", "2") \
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN") # 默认用INfo打印很多内容,4040WebUI端口
# *2 - 读取ratings.dat数据文件
ratingsRDD = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/ml-1m/ratings.dat")
# 数据基础预处理
# *3 - 转化为DF
ratingsDF = ratingsRDD \
.filter(lambda line: len(line.strip()) > 0 and len(line.strip().split("::")) == 4) \
.map(lambda line: line.strip().split("::")) \
.map(lambda p: Row(userid=int(p[0]), moviesId=int(p[1]), ratings=float(p[2]), timestamp=int(p[3]))) \
.toDF()
# *5 - 使用SQL实战
# 对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)。
# ratingsDF.printSchema()
# ratingsDF.show(2)
ratingsDF.createOrReplaceTempView("table_view")
spark.sql("""
select moviesId,round(avg(ratings),2) as cnt_rating,count(moviesId) as cnt_movies
from table_view
group by moviesId
having cnt_movies>2000
order by cnt_rating desc,cnt_movies desc
limit 10
""").show()
# +--------+----------+----------+
# |moviesId|cnt_rating|cnt_movies|
# +--------+----------+----------+
# | 318| 4.55| 2227|
# | 858| 4.52| 2223|
# | 527| 4.51| 2304|
# | 1198| 4.48| 2514|
# *4 - 使用DSL实战
import pyspark.sql.functions as F
# resultDF = ratingsDF \
# .select("moviesId", "ratings") \
# .groupby("moviesId") \
# .agg(F.round(F.avg("ratings"), 2).alias("cnt_rating"), F.count("moviesId").alias("cnt_movies")) \
# .filter("cnt_movies >2000") \
# .orderBy(["cnt_rating", "cnt_movies"], ascending=[0, 0]) \
# .limit(10)
# resultDF.show()
# +--------+----------+----------+
# |moviesId|cnt_rating|cnt_movies|
# +--------+----------+----------+
# | 318| 4.55| 2227|
# | 858| 4.52| 2223|
# | 527| 4.51| 2304|
# *6 - 将结果输出到csv
# resultDF\
# .coalesce(1)\
# .write\
# .csv("/export/data/pyspark_workspace/PySpark-SparkSQL_3.1.2/data/output1")
# *6 - 将结果输出到mysql
# 如果没有bigdata数库可以创建一个database,create database bigdata;
# resultDF\
# .coalesce(1)\
# .write\
# .format("jdbc") \
# .mode("overwrite") \
# .option("driver", "com.mysql.jdbc.Driver") \
# .option("url", "jdbc:mysql://node1:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true") \
# .option("dbtable", "bigdata.tb_top10_movies") \
# .option("user", "root") \
# .option("password", "123456") \
# .save()
print("写入MySQL停止")
# *7 - spark.stop
time.sleep(600)
spark.stop()
8. 数据清洗流程
(1)数据去重
- 1-createDataFrame:(数据集,字段列表)
- 2-dropDuplicates:删除所有字段重复记录,subset参数可以指定字段
- 3-countDistinct:去重计数
- 4-count:普通计数
- 5-monotonically_increasing_id:自增的id列
01dataClean.py
# -*- coding: utf-8 -*-
# Program function:数据清洗的操作
from pyspark.sql import SparkSession
from pyspark import SparkConf
import re
if __name__ == '__main__':
# TODO *之前做数据的接入:
# TODO *1 - 准备环境
conf = SparkConf().setAppName("testIris").setMaster("local[*]")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# TODO 准备好数据集
df = spark.createDataFrame([
(1, 144.5, 5.9, 33, 'M'),
(2, 167.2, 5.4, 45, 'M'),
(3, 124.1, 5.2, 23, 'F'),
(4, 144.5, 5.9, 33, 'M'),
(5, 133.2, 5.7, 54, 'F'),
(3, 124.1, 5.2, 23, 'F'),
(5, 129.2, 5.3, 42, 'M'),
], ['id', 'weight', 'height', 'age', 'gender'])
# 无意义重复数据去重:数据中行与行完全重复
# 1.首先删除完全一样的记录
print("=================1-删除所有字段重复记录================")
df1 = df.dropDuplicates()
df1.show()
# 2.其次,关键字段值完全一模一样的记录(在这个例子中,是指除了id之外的列一模一样)
print("=================2-删除除了id字段重复记录================")
# print(df1.columns)#['id', 'weight', 'height', 'age', 'gender']
df2 = df1.dropDuplicates(subset=[c for c in df1.columns if c != 'id'])
df2.show()
# 3- 查看某一列是否有重复值,统计id列,及去重id列后去重
print("=================3-查看某一列是否有重复值================")
import pyspark.sql.functions as F
df3 = df2.agg(F.count("id").alias("id_Count"), F.countDistinct("id").alias("id_distinct_count"))
df3.show()
print("=================4-自增的id列================")
df4 = df2.withColumn("new_id", F.monotonically_increasing_id())
df4.show()
#+---+------+------+---+------+-------------+
# | id|weight|height|age|gender| new_id|
# +---+------+------+---+------+-------------+
# | 5| 133.2| 5.7| 54| F| 25769803776|
# | 1| 144.5| 5.9| 33| M| 171798691840|
# | 2| 167.2| 5.4| 45| M| 592705486848|
# | 3| 124.1| 5.2| 23| F|1236950581248|
# | 5| 129.2| 5.3| 42| M|1365799600128|
# +---+------+------+---+------+-------------+
=================2-删除除了id字段重复记录================
+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
| 5| 133.2| 5.7| 54| F|
| 1| 144.5| 5.9| 33| M|
| 2| 167.2| 5.4| 45| M|
| 3| 124.1| 5.2| 23| F|
| 5| 129.2| 5.3| 42| M|
+---+------+------+---+------+
=================3-查看某一列是否有重复值================
+--------+-----------------+
|id_Count|id_distinct_count|
+--------+-----------------+
| 5| 4|
+--------+-----------------+
=================4-自增的id列================
+---+------+------+---+------+-------------+
| id|weight|height|age|gender| new_id|
+---+------+------+---+------+-------------+
| 5| 133.2| 5.7| 54| F| 25769803776|
| 1| 144.5| 5.9| 33| M| 171798691840|
| 2| 167.2| 5.4| 45| M| 592705486848|
| 3| 124.1| 5.2| 23| F|1236950581248|
| 5| 129.2| 5.3| 42| M|1365799600128|
+---+------+------+---+------+-------------+
(2) 数据空值处理
- 1-row和列表推导式结合sum函数:计算每条记录的缺失值情况
- 2-count:计算各列的非缺失值个数
- 3-*:全部字段操作
- 4-字段的缺失值比例:缺失的个数/总个数
- 5-拆分技巧:agg聚合函数
- 6-删除非空值少于thresh的缺失行:dropna(),thresh参数
- 7-删除缺失值过于严重的列:其实是先建一个DF,不要缺失值的列
# -*- coding: utf-8 -*-
# Program function:数据清洗的操作
from pyspark.sql import SparkSession
from pyspark import SparkConf
import re
if __name__ == '__main__':
# TODO *之前做数据的接入:
# TODO *1 - 准备环境
conf = SparkConf().setAppName("testIris").setMaster("local[*]")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# TODO 准备好数据集
df_miss = spark.createDataFrame([
(1, 143.5, 5.6, 28, 'M', 100000),
(2, 167.2, 5.4, 45, 'M', None),
(3, None, 5.2, None, None, None),
(4, 144.5, 5.9, 33, 'M', None),
(5, 133.2, 5.7, 54, 'F', None),
(6, 124.1, 5.2, None, 'F', None),
(7, 129.2, 5.3, 42, 'M', 76000), ],
['id', 'weight', 'height', 'age', 'gender', 'income'])
# TODO 1:使用列表推导式结合sum函数--计算每条记录的缺失值情况
dfmiss_sum = df_miss.rdd.map(lambda row: (row['id'], sum([c == None for c in row])))
print("diss_Sum value is:", dfmiss_sum.collect())
# diss_Sum value is: [(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]
# 2.计算各列的非缺失值个数
import pyspark.sql.functions as F
df_miss.agg(F.count("id").alias("id_Count"),
F.count("income").alias("income_count"),
F.count("*").alias("all")).show()
# +--------+------------+
# |id_Count|income_count|
# +--------+------------+
# | 7| 2|
# +--------+------------+
# 因为income初始值有5,5/7=70%,业务上规定达到60%以上空值去掉该列
df_miss.agg(*[F.count(t).alias(t + "dismissing") for t in df_miss.columns if t != 'income']).show()
# +------------+----------------+----------------+-------------+----------------+
# |iddismissing|weightdismissing|heightdismissing|agedismissing|genderdismissing|
# +------------+----------------+----------------+-------------+----------------+
# | 7| 6| 7| 5| 6|
# +------------+----------------+----------------+-------------+----------------+
# 如何统计出缺失值的比例
# 方法:1- 非缺失值个数/总数据个数=缺失值比例
df_miss.agg(*[(1 - (F.count(c) / F.count('*'))).alias(c + '_missing') for c in df_miss.columns]).show()
# +----------+------------------+--------------+------------------+------------------+------------------+
# |id_missing| weight_missing|height_missing| age_missing| gender_missing| income_missing|
# +----------+------------------+--------------+------------------+------------------+------------------+
# | 0.0|0.1428571428571429| 0.0|0.2857142857142857|0.1428571428571429|0.7142857142857143|
# +----------+------------------+--------------+------------------+------------------+------------------+
# 拆分
df_miss.agg((F.count("weight") / F.count("*")).alias("w_missing"),
(F.count("height") / F.count("*")).alias("w_missing")).show()
# +------------------+---------+
# | w_missing|w_missing|
# +------------------+---------+
# |0.8571428571428571| 1.0|
# +------------------+---------+
# 3、删除缺失值过于严重的列
# 其实是先建一个DF,不要缺失值的列
df_miss_no_income = df_miss.select([c for c in df_miss.columns if c != 'income'])
df_miss_no_income.show()
# +---+------+------+----+------+
# | id|weight|height| age|gender|
# +---+------+------+----+------+
# | 1| 143.5| 5.6| 28| M|
# | 2| 167.2| 5.4| 45| M|
# | 3| null| 5.2|null| null|
# | 4| 144.5| 5.9| 33| M|
# | 5| 133.2| 5.7| 54| F|
# | 6| 124.1| 5.2|null| F|
# | 7| 129.2| 5.3| 42| M|
# +---+------+------+----+------+
# drop rows that have less than `thresh` non-null values.
# 删除少于thread的非空行
df_miss_no_income.dropna(thresh=3).show()
# +---+------+------+----+------+
# | id|weight|height| age|gender|
# +---+------+------+----+------+
# | 1| 143.5| 5.6| 28| M|
# | 2| 167.2| 5.4| 45| M|
# | 4| 144.5| 5.9| 33| M|
# | 5| 133.2| 5.7| 54| F|
# | 6| 124.1| 5.2|null| F|
# | 7| 129.2| 5.3| 42| M|
# +---+------+------+----+------+
# 5、填充缺失值,可以用fillna来填充缺失值,
# 对于bool类型、或者分类类型,可以为缺失值单独设置一个类型,missing
# 对于数值类型,可以用均值或者中位数等填充
# fillna可以接收两种类型的参数:
# 一个数字、字符串,这时整个DataSet中所有的缺失值都会被填充为相同的值。
# 也可以接收一个字典{列名:值}这样
# 比如age,可以计算除了null之外的平均值在填充age的缺失值
# means = df_miss_no_income.agg(*[F.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']).toPandas().to_dict('records')[0]
# 案列求解均值
df_miss_no_income.agg(*[F.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']).show()
# +---+------------------+-----------------+----+
# | id| weight| height| age|
# +---+------------------+-----------------+----+
# |4.0|140.28333333333333|5.471428571428571|40.4|
# +---+------------------+-----------------+----+
# 按照数值型的数据求解的均值按列填入缺失值
print(df_miss_no_income.agg(*[F.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']).toPandas())
# id weight height age
# 0 4.0 140.283333 5.471429 40.4
means = df_miss_no_income.agg(
*[F.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']).toPandas().to_dict(orient="records")[
0]
print(means) # [{'id': 4.0, 'weight': 140.283333, 'height': 5.471429, 'age': 40.4}]
# 对于gender需要填入mising
# {'id': 4.0, 'weight': 140.28333333333333, 'height': 5.471428571428571, 'age': 40.4, 'gender': 'missing'}
means["gender"] = "missing"
print(means)
# `value` must be a mapping from column name (string)
df_miss_no_income.fillna(means).show()
# +---+------------------+------+---+-------+
# | id| weight|height|age| gender|
# +---+------------------+------+---+-------+
# | 1| 143.5| 5.6| 28| M|
# | 2| 167.2| 5.4| 45| M|
# | 3|140.28333333333333| 5.2| 40|missing|
# | 4| 144.5| 5.9| 33| M|
# | 5| 133.2| 5.7| 54| F|
# | 6| 124.1| 5.2| 40| F|
# | 7| 129.2| 5.3| 42| M|
# +---+------------------+------+---+-------+
9. 挑选花案例
- 1-启动sparkSession
- 2-读取数据源
- 3-转化为df
- 4-执行花式查询
# -*- coding: utf-8 -*-
# Program function:
'''
1-如果数据文件没有header,直接使用sc.textFile读取
1-1 row对象
1-2 structType方法
2-如果数据文件有header,使用spark.read.format("csv").option("sep")
'''
from pyspark.sql import SparkSession
from pyspark import SparkConf
if __name__ == '__main__':
# 1-启动sparksession
conf = SparkConf().setAppName("testIris").setMaster("local[*]")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# 2-读取数据源
file_rdd = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/iris.data")
print("数量:",file_rdd.count())
# 数量: 150
# 3- 转化为df
from pyspark.sql.types import Row
irisDF = file_rdd \
.filter((lambda line: len(line.strip()>0)) and (lambda line : len(line.strip().split(","))==5) ) \
.map(lambda line: line.strip().split(",")) \
.map(lambda x : Row(sepal_length = float(x[0]),sepal_width=float(x[1]), petal_length=float(x[2]),
petal_width=float(x[3]), irisclass=x[4])) \
.toDF()
# 4-执行花式查询操作
irisDF.printSchema()
irisDF.show()
# root
# |-- sepal_length: double (nullable = true)
# |-- sepal_width: double (nullable = true)
# |-- petal_length: double (nullable = true)
# |-- petal_width: double (nullable = true)
# |-- irisclass: string (nullable = true)
#
# +------------+-----------+------------+-----------+-----------+
# |sepal_length|sepal_width|petal_length|petal_width| irisclass|
# +------------+-----------+------------+-----------+-----------+
# | 5.1| 3.5| 1.4| 0.2|Iris-setosa|
# | 4.9| 3.0| 1.4| 0.2|Iris-setosa|
# | 4.7| 3.2| 1.3| 0.2|Iris-setosa|
# | 4.6| 3.1| 1.5| 0.2|Iris-setosa|
# | 5.0| 3.6| 1.4| 0.2|Iris-setosa|
# | 5.4| 3.9| 1.7| 0.4|Iris-setosa|
# | 4.6| 3.4| 1.4| 0.3|Iris-setosa|
# | 5.0| 3.4| 1.5| 0.2|Iris-setosa|
# | 4.4| 2.9| 1.4| 0.2|Iris-setosa|
# | 4.9| 3.1| 1.5| 0.1|Iris-setosa|
# | 5.4| 3.7| 1.5| 0.2|Iris-setosa|
# | 4.8| 3.4| 1.6| 0.2|Iris-setosa|
# | 4.8| 3.0| 1.4| 0.1|Iris-setosa|
# | 4.3| 3.0| 1.1| 0.1|Iris-setosa|
# | 5.8| 4.0| 1.2| 0.2|Iris-setosa|
# | 5.7| 4.4| 1.5| 0.4|Iris-setosa|
# | 5.4| 3.9| 1.3| 0.4|Iris-setosa|
# | 5.1| 3.5| 1.4| 0.3|Iris-setosa|
# | 5.7| 3.8| 1.7| 0.3|Iris-setosa|
# | 5.1| 3.8| 1.5| 0.3|Iris-setosa|
# +------------+-----------+------------+-----------+-----------+
# only showing top 20 rows
irisDF.select("sepal_length").show()
irisDF.select(irisDF.sepal_length).show()
irisDF.select("sepal_length", "sepal_width").show()
# +------------+
# |sepal_length|
# +------------+
# | 5.1|
# | 4.9|
# | 4.7|
# | 4.6|
# | 5.0|
# | 5.4|
# | 4.6|
# | 5.0|
# | 4.4|
# | 4.9|
# | 5.4|
# | 4.8|
# | 4.8|
# | 4.3|
# | 5.8|
# | 5.7|
# | 5.4|
# | 5.1|
# | 5.7|
# | 5.1|
# +------------+
# only showing top 20 rows
#
# +------------+
# |sepal_length|
# +------------+
# | 5.1|
# | 4.9|
# | 4.7|
# | 4.6|
# | 5.0|
# | 5.4|
# | 4.6|
# | 5.0|
# | 4.4|
# | 4.9|
# | 5.4|
# | 4.8|
# | 4.8|
# | 4.3|
# | 5.8|
# | 5.7|
# | 5.4|
# | 5.1|
# | 5.7|
# | 5.1|
# +------------+
# only showing top 20 rows
#
# +------------+-----------+
# |sepal_length|sepal_width|
# +------------+-----------+
# | 5.1| 3.5|
# | 4.9| 3.0|
# | 4.7| 3.2|
# | 4.6| 3.1|
# | 5.0| 3.6|
# | 5.4| 3.9|
# | 4.6| 3.4|
# | 5.0| 3.4|
# | 4.4| 2.9|
# | 4.9| 3.1|
# | 5.4| 3.7|
# | 4.8| 3.4|
# | 4.8| 3.0|
# | 4.3| 3.0|
# | 5.8| 4.0|
# | 5.7| 4.4|
# | 5.4| 3.9|
# | 5.1| 3.5|
# | 5.7| 3.8|
# | 5.1| 3.8|
# +------------+-----------+
# only showing top 20 rows
# 基于统计操作
irisDF.groupby("irisclass").count().show()
# +---------------+-----+
# | irisclass|count|
# +---------------+-----+
# | Iris-virginica| 50|
# | Iris-setosa| 50|
# |Iris-versicolor| 50|
# +---------------+-----+
import pyspark.sql.functions as F
irisDF.groupby("irisclass").agg(F.count(F.col("irisclass")).alias("variable")).show()
# +---------------+--------+
# | irisclass|variable|
# +---------------+--------+
# | Iris-virginica| 50|
# | Iris-setosa| 50|
# |Iris-versicolor| 50|
# +---------------+--------+
# SQL风格
irisDF.createOrReplaceTempView("table_view")
spark.sql("""
select irisclass,count(1) as count
from table_view
group by irisclass
""").show()
# +---------------+-----+
# | irisclass|count|
# +---------------+-----+
# | Iris-virginica| 50|
# | Iris-setosa| 50|
# |Iris-versicolor| 50|
# +---------------+-----+
版权归原作者 卡林神不是猫 所有, 如有侵权,请联系我们删除。