0


PySpark基础 —— SparkSQL

一、快速入门

import findspark
from pyspark.sql import SparkSession

findspark.init()
spark = SparkSession.builder.getOrCreate()
# 无法同时运行多个SparkContext
sc = spark.sparkContext

类描述StructField(name, dataType[, nullable, metadata])定义表的字段StructType([fields])定义表的结构BinaryType()二进制ByteType()字节BooleanType()布尔NullType()空值FloatType()单精度浮点数DoubleType()双精度浮点数DecimalType([precision, scale])十进制浮点数ShortType()短整数IntegerType()整数LongType()长整数StringType()字符串DateType()日期,含年月日TimestampType()时间戳,含年月日和时分秒DayTimeIntervalType([startField, endField])时间间隔,含天数和时分秒ArrayType(elementType[, containsNull])数组(复合数据类型)MapType(keyType, valueType[, valueContainsNull])字典(复合数据类型)StructType([fields])表单(复合数据类型)


data = [
 [1, "建国", 1, {'sex': '男', 'age': 17}, ['篮球', '足球'], {'ch': 92, 'en': 75}],
 [2, "建军", 1, {'sex': '男', 'age': 16}, ['排球', '跑步', '爬山'], {'ch': 95, 'en': 78, 'math': 77}],
 [3, "秀兰", 1, {'sex': '女', 'age': 20}, ['阅读', '音乐'], {'math': 81, 'ch': 87, 'en': 76}],
 [4, "秀丽", 2, {'sex': '女', 'age': 18}, ['跳舞'], {'ch': 94, 'math': 90}],
 [5, "翠花", 2, {'sex': '女', 'age': 18}, ['游泳', '瑜伽'], {'en': 70, 'math': 70}],
 [6, "雪梅", 1, {'sex': '女', 'age': 17}, ['美食', '电影'], {'ch': 85, 'en': 83, 'math': 86}],
 [7, "小美", 1, {'sex': '女', 'age': 20}, ['美术'], {'en': 75, 'math': 73}],
 [8, "大山", 1, {'sex': '男', 'age': 18}, ['唱歌'], {'ch': 78, 'en': 71, 'math': 98}]]

定义表结构-方法1


from pyspark.sql.types import *
from pyspark.sql import types
id_ = StructField('id', LongType())
name = StructField('name', StringType())
class_ = StructField('class', IntegerType())
sex = StructField('sex', StringType())
age = StructField('age', IntegerType())
info = StructField('info', StructType([sex, age]))
interest = StructField('interest', ArrayType(StringType()))
score = StructField('score', MapType(StringType(), IntegerType()))

schema = StructType([id_, name, class_, info, interest, score])
df = spark.createDataFrame(data=data, schema=schema)
df.show()

'''
+---+----+-----+--------+------------------+--------------------+
| id|name|class| info| interest| score|
+---+----+-----+--------+------------------+--------------------+
| 1|建国| 1|{男, 17}| [篮球, 足球]|{ch -> 92, en -> 75}|
| 2|建军| 1|{男, 16}|[排球, 跑步, 爬山]|{en -> 78, math -...|
| 3|秀兰| 1|{女, 20}| [阅读, 音乐]|{en -> 76, math -...|
| 4|秀丽| 2|{女, 18}| [跳舞]|{ch -> 94, math -...|
| 5|翠花| 2|{女, 18}| [游泳, 瑜伽]|{en -> 70, math -...|
| 6|雪梅| 1|{女, 17}| [美食, 电影]|{en -> 83, math -...|
| 7|小美| 1|{女, 20}| [美术]|{en -> 75, math -...|
| 8|大山| 1|{男, 18}| [唱歌]|{en -> 71, math -...|
+---+----+-----+--------+------------------+--------------------+
'''

定义表结构-方法2

# long等价于bigint
# integer等价于int
schema = """
id long, 
name string, 
class integer, 
info struct<sex:string, age:integer>, 
interest array<string>, 
score map<string,integer>
"""

df = spark.createDataFrame(data=data, schema=schema)
df.show()

'''
+---+----+-----+--------+------------------+--------------------+
| id|name|class| info| interest| score|
+---+----+-----+--------+------------------+--------------------+
| 1|建国| 1|{男, 17}| [篮球, 足球]|{ch -> 92, en -> 75}|
| 2|建军| 1|{男, 16}|[排球, 跑步, 爬山]|{en -> 78, math -...|
| 3|秀兰| 1|{女, 20}| [阅读, 音乐]|{en -> 76, math -...|
| 4|秀丽| 2|{女, 18}| [跳舞]|{ch -> 94, math -...|
| 5|翠花| 2|{女, 18}| [游泳, 瑜伽]|{en -> 70, math -...|
| 6|雪梅| 1|{女, 17}| [美食, 电影]|{en -> 83, math -...|
| 7|小美| 1|{女, 20}| [美术]|{en -> 75, math -...|
| 8|大山| 1|{男, 18}| [唱歌]|{en -> 71, math -...|
+---+----+-----+--------+------------------+--------------------+
'''

创建DataFrame

# 从行式列表中创建PySpark数据帧
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
df = spark.createDataFrame([
 Row(uid=1, name='tony', height=1.73, birth=date(2000, 1, 1), dt=datetime(2020, 1, 1, 12, 0)),
 Row(uid=2, name='jack', height=1.78, birth=date(2000, 2, 1), dt=datetime(2020, 1, 2, 12, 0)),
 Row(uid=3, name='mike', height=1.83, birth=date(2000, 3, 1), dt=datetime(2020, 1, 3, 12, 0))])
df.show()

'''
+---+----+------+----------+-------------------+
|uid|name|height| birth| dt|
+---+----+------+----------+-------------------+
| 1|tony| 1.73|2000-01-01|2020-01-01 12:00:00|
| 2|jack| 1.78|2000-02-01|2020-01-02 12:00:00|
| 3|mike| 1.83|2000-03-01|2020-01-03 12:00:00|
+---+----+------+----------+-------------------+
'''

# 使用显式架构创建PySpark数据帧
df = spark.createDataFrame([
 (1, 'tony', 1.73, date(2000, 1, 1), datetime(2020, 1, 1, 12, 0)),
 (2, 'jack', 1.78, date(2000, 2, 1), datetime(2020, 1, 2, 12, 0)),
 (3, 'mike', 1.83, date(2000, 3, 1), datetime(2020, 1, 3, 12, 0))],
 schema='uid long, name string, height double, birth date, dt timestamp')
df.show()
'''
+---+----+------+----------+-------------------+
|uid|name|height| birth| dt|
+---+----+------+----------+-------------------+
| 1|tony| 1.73|2000-01-01|2020-01-01 12:00:00|
| 2|jack| 1.78|2000-02-01|2020-01-02 12:00:00|
| 3|mike| 1.83|2000-03-01|2020-01-03 12:00:00|
+---+----+------+----------+-------------------+
'''

# 从panda数据帧创建PySpark数据帧
pandas_df = pd.DataFrame({
 'uid': [1, 2, 3],
 'name': ['tony', 'jack', 'mike'],
 'height': [1.73, 1.78, 1.83],
 'birth': [date(2000, 1, 1),
 date(2000, 2, 1),
 date(2000, 3, 1)],
 'dt': [datetime(2020, 1, 1, 12, 0),
 datetime(2020, 1, 2, 12, 0),
 datetime(2020, 1, 3, 12, 0)]})
df = spark.createDataFrame(pandas_df)
df.show()
'''
+---+----+------+----------+-------------------+
|uid|name|height| birth| dt|
+---+----+------+----------+-------------------+
| 1|tony| 1.73|2000-01-01|2020-01-01 12:00:00|
| 2|jack| 1.78|2000-02-01|2020-01-02 12:00:00|
| 3|mike| 1.83|2000-03-01|2020-01-03 12:00:00|
+---+----+------+----------+-------------------+
'''

# 从包含元组列表的RDD创建PySpark数据帧
rdd = spark.sparkContext.parallelize([
 (1, 'tony', 1.73, date(2000, 1, 1), datetime(2020, 1, 1, 12, 0)),
 (2, 'jack', 1.78, date(2000, 2, 1), datetime(2020, 1, 2, 12, 0)),
 (3, 'mike', 1.83, date(2000, 3, 1), datetime(2020, 1, 3, 12, 0))])
df = spark.createDataFrame(rdd, schema=['uid', 'name', 'height', 'birth', 'dt'])
df.show()
'''
+---+----+------+----------+-------------------+
|uid|name|height| birth| dt|
+---+----+------+----------+-------------------+
| 1|tony| 1.73|2000-01-01|2020-01-01 12:00:00|
| 2|jack| 1.78|2000-02-01|2020-01-02 12:00:00|
| 3|mike| 1.83|2000-03-01|2020-01-03 12:00:00|
+---+----+------+----------+-------------------+

'''

# 以上创建的数据帧具有相同的结果和架构
# Schema: 字段名称、数据类型、是否可空的
df.printSchema()
'''
root
 |-- uid: long (nullable = true)
 |-- name: string (nullable = true)
 |-- height: double (nullable = true)
 |-- birth: date (nullable = true)
 |-- dt: timestamp (nullable = true)
'''

预览数据

df.show(1) # 预览1条数据
'''
+---+----+------+----------+-------------------+
|uid|name|height| birth| dt|
+---+----+------+----------+-------------------+
| 1|tony| 1.73|2000-01-01|2020-01-01 12:00:00|
+---+----+------+----------+-------------------+
only showing top 1 row
'''

# eagerEval, eager evaluation, 立即求值
# 立即打印DataFrame信息
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

# vertical, 垂直显示
df.show(1, vertical=True)

'''
-RECORD 0---------------------
 uid | 1 
 name | tony 
 height | 1.73 
 birth | 2000-01-01 
 dt | 2020-01-01 12:00:00 
only showing top 1 row
'''

# 打印字段信息
df.columns
# ['uid', 'name', 'height', 'birth', 'dt']

# 打印架构信息, 类似pandas的df.info()
df.printSchema()
'''
root
 |-- uid: long (nullable = true)
 |-- name: string (nullable = true)
 |-- height: double (nullable = true)
 |-- birth: date (nullable = true)
 |-- dt: timestamp (nullable = true)
'''

# describe, 统计描述
df.select('uid', 'name', 'height').describe().show()
'''
+-------+---+----+------------------+
|summary|uid|name| height|
+-------+---+----+------------------+
| count| 3| 3| 3|
| mean|2.0|null| 1.78|
| stddev|1.0|null|0.0500000000000001|
| min| 1|jack| 1.73|
| max| 3|tony| 1.83|
+-------+---+----+------------------+
'''

# collect, 预览所有数据
df.collect()
'''
[Row(uid=1, name='tony', height=1.73, birth=datetime.date(2000, 1, 1), dt=datetime.datetime(2020, 1, 1, 12,
0)),
 Row(uid=2, name='jack', height=1.78, birth=datetime.date(2000, 2, 1), dt=datetime.datetime(2020, 1, 2, 12,
0)),
 Row(uid=3, name='mike', height=1.83, birth=datetime.date(2000, 3, 1), dt=datetime.datetime(2020, 1, 3, 12,
0))]
'''

# take, 预览1行数据
df.take(1)
# [Row(uid=1, name='tony', height=1.73, birth=datetime.date(2000, 1, 1), dt=datetime.datetime(2020, 1, 1, 12,0))]

# 转换为pandas的df
df.toPandas()

选取和访问数据

# 选取name列并显示
df.select(df.name).show()
'''
+----+
|name|
+----+
|tony|
|jack|
|mike|
+----+
'''

from pyspark.sql.functions import upper
# 将name列转换为大写,并新增字段upper_name
df.withColumn('upper_name', upper(df.name)).show()
'''
+---+----+------+----------+-------------------+----------+
|uid|name|height| birth| dt|upper_name|
+---+----+------+----------+-------------------+----------+
| 1|tony| 1.73|2000-01-01|2020-01-01 12:00:00| TONY|
| 2|jack| 1.78|2000-02-01|2020-01-02 12:00:00| JACK|
| 3|mike| 1.83|2000-03-01|2020-01-03 12:00:00| MIKE|
+---+----+------+----------+-------------------+----------+
'''

# 筛选uid列等于1的数据
df.filter(df.uid == 1).show()
'''
+---+----+------+----------+-------------------+
|uid|name|height| birth| dt|
+---+----+------+----------+-------------------+
| 1|tony| 1.73|2000-01-01|2020-01-01 12:00:00|
+---+----+------+----------+-------------------+
'''

应用函数

**PySpark支持各种UDF和API,支持调用Python内置函数和Pandas函数 **

UDF, User defined function, 用户自定义函数

import pandas
from pyspark.sql.functions import pandas_udf

通过pandas_udf调用Pandas函数

@pandas_udf('long') # 申明返回值的数据类型
def pandas_func(col: pd.Series) -> pd.Series:
 # 使用poandas的Series,每个元素*100
 return col * 100
df.select(pandas_func(df.height)).show()
'''
+-------------------+
|pandas_func(height)|
+-------------------+
| 173|
| 178|
| 183|
+-------------------+
'''

通过mapInPandas调用Pandas函数

def pandas_func(DataFrame):
 for row in DataFrame:
 yield row[row.uid == 1]
df.mapInPandas(pandas_func, schema=df.schema).show()
'''
+---+----+------+----------+-------------------+
|uid|name|height| birth| dt|
+---+----+------+----------+-------------------+
| 1|tony| 1.73|2000-01-01|2020-01-01 12:00:00|
+---+----+------+----------+-------------------+
'''

分组聚合

df = spark.createDataFrame([
 [1, 'red', 'banana', 10],
 [2, 'blue', 'banana', 20],
 [3, 'red', 'carrot', 30],
 [4, 'blue', 'grape', 40],
 [5, 'red', 'carrot', 50],
 [6, 'black', 'carrot', 60],
 [7, 'red', 'banana', 70],
 [8, 'red', 'grape', 80]],
 schema=['order_id', 'color', 'fruit', 'amount'])
df.show()
'''
+--------+-----+------+------+
|order_id|color| fruit|amount|
+--------+-----+------+------+
| 1| red|banana| 10|
| 2| blue|banana| 20|
| 3| red|carrot| 30|
| 4| blue| grape| 40|
| 5| red|carrot| 50|
| 6|black|carrot| 60|
| 7| red|banana| 70|
| 8| red| grape| 80|
+--------+-----+------+------+
'''

df.groupby('fruit').agg({'amount': 'sum'}).show()
'''
+------+-----------+
| fruit|sum(amount)|
+------+-----------+
| grape| 120|
|banana| 100|
|carrot| 140|
+------+-----------+
'''

通过applyInPandas调用Pandas函数

def pandas_func(pd_df):
 pd_df['total'] = pd_df['amount'].sum()
 return pd_df
schema = 'order_id long, color string, fruit string, amount long, total long'
# 以color分组对amount求和,并新增total字段
df.groupby('color').applyInPandas(pandas_func, schema=schema).show()
'''
+--------+-----+------+------+-----+
|order_id|color| fruit|amount|total|
+--------+-----+------+------+-----+
| 1| red|banana| 10| 240|
| 3| red|carrot| 30| 240|
| 5| red|carrot| 50| 240|
| 7| red|banana| 70| 240|
| 8| red| grape| 80| 240|
| 6|black|carrot| 60| 60|
| 2| blue|banana| 20| 60|
| 4| blue| grape| 40| 60|
+--------+-----+------+------+-----+
'''

df1 = spark.createDataFrame(
 data=[(1, 'tony'),
 (2, 'jack'),
 (3, 'mike'),
 (4, 'rose')],
 schema=('id', 'name'))
df2 = spark.createDataFrame(
 data=[(1, '二班'),
 (2, '三班')],
 schema=('id', 'class'))
def left_join(df_left, df_right):
 # merge, 表关联
 return pd.merge(df_left, df_right, how='left', on='id')
# cogroup, combine group, 分组合并
schema = 'id int, name string, class string'
df1.groupby('id') \
 .cogroup(df2.groupby('id')) \
 .applyInPandas(left_join, schema=schema).show()
'''
+---+----+-----+
| id|name|class|
+---+----+-----+
| 1|tony| 二班|
| 3|mike| null|
| 2|jack| 三班|
| 4|rose| null|
+---+----+-----+
'''

数据导入和导出

# CSV简单易用,Parquet和ORC是高性能的文件格式,读写速度更快
# PySpark中还有许多其他可用的数据源,如JDBC、text、binaryFile、Avro等
df.write.csv('fruit.csv', header=True)
spark.read.csv('fruit.csv', header=True).show()
'''
+--------+-----+------+------+
|order_id|color| fruit|amount|
+--------+-----+------+------+
| 5| red|carrot| 50|
| 6|black|carrot| 60|
| 1| red|banana| 10|
| 2| blue|banana| 20|
| 3| red|carrot| 30|
| 4| blue| grape| 40|
| 7| red|banana| 70|
| 8| red| grape| 80|
+--------+-----+------+------+
'''

# Parquet
df.write.parquet('fruit.parquet')
spark.read.parquet('fruit.parquet').show()
'''
+--------+-----+------+------+
|order_id|color| fruit|amount|
+--------+-----+------+------+
| 5| red|carrot| 50|
| 6|black|carrot| 60|
| 1| red|banana| 10|
| 2| blue|banana| 20|
| 7| red|banana| 70|
| 8| red| grape| 80|
| 3| red|carrot| 30|
| 4| blue| grape| 40|
+--------+-----+------+------+
'''

# ORC
df.write.orc('fruit.orc')
spark.read.orc('fruit.orc').show()
'''
+--------+-----+------+------+
|order_id|color| fruit|amount|
+--------+-----+------+------+
| 7| red|banana| 70|
| 8| red| grape| 80|
| 5| red|carrot| 50|
| 6|black|carrot| 60|
| 1| red|banana| 10|
| 2| blue|banana| 20|
| 3| red|carrot| 30|
| 4| blue| grape| 40|
+--------+-----+------+------+
'''

SparkSQL

# DataFrame和Spark SQL共享同一个执行引擎,因此可以无缝地互换使用
# 可以将DataFrame注册为一个表,并运行SQL语句
df.createOrReplaceTempView('tableA')
spark.sql('select count(*) from tableA').show()
'''
+--------+
|count(1)|
+--------+
| 8|
+--------+
'''

# 可以在开箱即用的SQL中注册和调用UDF
@pandas_udf('float')
def pandas_func(col: pd.Series) -> pd.Series:
 return col * 0.1
# register, 注册UDF函数
spark.udf.register('pandas_func', pandas_func)
spark.sql('select pandas_func(amount) from tableA').show()
'''
+-------------------+
|pandas_func(amount)|
+-------------------+
| 1.0|
| 2.0|
| 3.0|
| 4.0|
| 5.0|
| 6.0|
| 7.0|
| 8.0|
+-------------------+
'''

from pyspark.sql.functions import expr
# expr, expression, SQL表达式
df.selectExpr('pandas_func(amount)').show()
df.select(expr('pandas_func(amount)')).show()
'''
+-------------------+
|pandas_func(amount)|
+-------------------+
| 1.0|
| 2.0|
| 3.0|
| 4.0|
| 5.0|
| 6.0|
| 7.0|
| 8.0|
+-------------------+
+-------------------+
|pandas_func(amount)|
+-------------------+
| 1.0|
| 2.0|
| 3.0|
| 4.0|
| 5.0|
| 6.0|
| 7.0|
| 8.0|
+-------------------+
'''

词频统计

rdd = sc.textFile('./data/word.txt')
rdd.collect()
'''
['hello world',
 'hello python',
 'hello hadoop',
 'hello hive',
 'hello spark',
 'spark and jupyter',
 'spark and python',
 'spark and pandas',
 'spark and sql']
'''

rdd_word = rdd.flatMap(lambda x: x.split(' '))
rdd_one = rdd_word.map(lambda x: (x, 1))
rdd_count = rdd_one.reduceByKey(lambda x, y: x+y)
rdd_count.collect()
'''
[('world', 1),
 ('python', 2),
 ('hadoop', 1),
 ('hive', 1),
 ('jupyter', 1),
 ('pandas', 1),
 ('hello', 5),
 ('spark', 5),
 ('and', 4),
 ('sql', 1)] 
'''

二、SparkSQL——创建DF

import pyspark
from pyspark.sql import SparkSession
import findspark
findspark.init()
spark = SparkSession \
    .builder \
    .appName("test") \
    .master("local[*]") \
    .enableHiveSupport() \
    .getOrCreate()
sc = spark.sparkContext

1.通过toDF方法

  • 将RDD转换为DataFrame
rdd = sc.parallelize(
    [("jack", 20),
     ("rose", 18)])
df = rdd.toDF(schema=['name','age'])
df.show()
'''
+----+---+
|name|age|
+----+---+
|jack| 20|
|rose| 18|
+----+---+
'''
  • 将DataFrame转换为RDD
rdd = df.rdd
rdd.collect()
# [Row(name='jack', age=20), Row(name='rose', age=18)]

2.通过createDataFrame方法

  • 将Pandas的DataFrame转换为pyspark的DataFrame
import pandas as pd
pd_df = pd.DataFrame(
    data=[("jack", 20),
          ("rose", 18)],
    columns=["name", "age"])
pd_df.head()

df = spark.createDataFrame(pd_df)
df.show()
'''
+----+---+
|name|age|
+----+---+
|jack| 20|
|rose| 18|
+----+---+
'''
  • 将pyspark的DataFrame转换为Pandas的DataFrame
pd_df = df.toPandas()
pd_df

3.通过createDataFrame方法

  • 将python的list转换为pyspark的DataFrame
data = [("jack", 20),
        ("rose", 18)]

df = spark.createDataFrame(data=data,schema=['name','age'])
df.show()
'''
+----+---+
|name|age|
+----+---+
|jack| 20|
|rose| 18|
+----+---+
'''

4.通过createDataFrame方法

  • 指定rdd和schema创建DataFrame
from datetime import datetime
data = [("jack", 20, datetime(2001, 1, 10)),
        ("rose", 18, datetime(2003, 2, 20)),
        ("tom", 20, datetime(2004, 3, 30))]
rdd = sc.parallelize(data)
rdd.collect()
'''
[('jack', 20, datetime.datetime(2001, 1, 10, 0, 0)),
 ('rose', 18, datetime.datetime(2003, 2, 20, 0, 0)),
 ('tom', 20, datetime.datetime(2004, 3, 30, 0, 0))]
'''

schema = 'name string, age int, birth date'
df = spark.createDataFrame(data=rdd,schema=schema)
df.show()
'''
+----+---+----------+
|name|age|     birth|
+----+---+----------+
|jack| 20|2001-01-10|
|rose| 18|2003-02-20|
| tom| 20|2004-03-30|
+----+---+----------+
'''

df.printSchema()
'''
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- birth: date (nullable = true)
'''

5.通过读取文件创建DataFrame

  • 读取json文件,csv文件,txt文件,parquet文件,hive数据表,mysql数据表等

5.1.读取json文件

df = spark.read.json('./data/iris.json')
df.show(5)
'''
+-----+------------+-----------+------------+-----------+
|label|petal_length|petal_width|sepal_length|sepal_width|
+-----+------------+-----------+------------+-----------+
|    0|         1.4|        0.2|         5.1|        3.5|
|    0|         1.4|        0.2|         4.9|        3.0|
|    0|         1.3|        0.2|         4.7|        3.2|
|    0|         1.5|        0.2|         4.6|        3.1|
|    0|         1.4|        0.2|         5.0|        3.6|
+-----+------------+-----------+------------+-----------+
'''

df.printSchema()
'''
root
 |-- label: long (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
'''

5.2.读取csv文件

# inferSchema=True 推断字段的数据类型
df = spark.read.csv('./data/iris.csv', inferSchema=True, header=True)
df.show(5)
df.printSchema()
'''
+------------+-----------+------------+-----------+-----+
|sepal_length|sepal_width|petal_length|petal_width|label|
+------------+-----------+------------+-----------+-----+
|         5.1|        3.5|         1.4|        0.2|    0|
|         4.9|        3.0|         1.4|        0.2|    0|
|         4.7|        3.2|         1.3|        0.2|    0|
|         4.6|        3.1|         1.5|        0.2|    0|
|         5.0|        3.6|         1.4|        0.2|    0|
+------------+-----------+------------+-----------+-----+
only showing top 5 rows

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- label: integer (nullable = true)
'''

5.3.读取txt文件

df = spark.read.csv('data/iris.txt',sep=',',inferSchema=True,header=True)
df.show(5)
df.printSchema()
'''
+------------+-----------+------------+-----------+-----+
|sepal_length|sepal_width|petal_length|petal_width|label|
+------------+-----------+------------+-----------+-----+
|         5.1|        3.5|         1.4|        0.2|    0|
|         4.9|        3.0|         1.4|        0.2|    0|
|         4.7|        3.2|         1.3|        0.2|    0|
|         4.6|        3.1|         1.5|        0.2|    0|
|         5.0|        3.6|         1.4|        0.2|    0|
+------------+-----------+------------+-----------+-----+
only showing top 5 rows

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- label: integer (nullable = true)
'''

5.4.读取parquet文件

df = spark.read.parquet('data/iris.parquet')
df.show(5)
'''
+-----+------------+-----------+------------+-----------+
|label|petal_length|petal_width|sepal_length|sepal_width|
+-----+------------+-----------+------------+-----------+
|    0|         1.4|        0.2|         5.1|        3.5|
|    0|         1.4|        0.2|         4.9|        3.0|
|    0|         1.3|        0.2|         4.7|        3.2|
|    0|         1.5|        0.2|         4.6|        3.1|
|    0|         1.4|        0.2|         5.0|        3.6|
+-----+------------+-----------+------------+-----------+
only showing top 5 rows

'''

5.5.读取hive数据表

# 创建数据库
spark.sql('create database edu')

# 创建表
spark.sql(
    '''
    create table edu.course(
        c_id int,
        c_name string,
        t_id int
    )
    row format delimited
    fields terminated by ','
    '''
)

# 加载本地数据到指定表
spark.sql("load data local inpath 'data/course.csv' into table edu.course")

# 读取数据(如果访问不了,可能需要修改 metastore_db 、spark-warehouse 文件权限为共享才能访问)
df = spark.sql('select * from edu.course')
df.show(5)

'''
+----+------+----+
|c_id|c_name|t_id|
+----+------+----+
|   1|  语文|   2|
|   2|  数学|   1|
|   3|  英语|   3|
|   4|  政治|   5|
|   5|  历史|   4|
+----+------+----+
only showing top 5 rows
'''

5.6.读取mysql数据表

  • MySQL Connector版本必须一致
  • 解压后将 mysql-connector-java-8.0.25\mysql-connector-java-8.0.25.jar
  • 复制到spark环境的 Spark\jars\mysql-connector-java-8.0.25.jar
  • MySQL :: Download MySQL Connector/J (Archived Versions)
# localhost:链接地址
# ?serverTimezone=GMT 设置时区;UTC代表的是全球标准时间 ,但是我们使用的时间是北京时区也就是东八区,领先UTC八个小时。
url = 'jdbc:mysql://localhost:3306/db?serverTimezone=GMT'
df = (
    spark.read.format('jdbc')
    .option('url', url)
    .option('dbtable','order_info')
    .option('user','root')
    .option('password','root')
    .load()
)
df.show(5)
'''
+----------+-------+----------+--------+------------+----------+-------------------+-------------------+
|  order_id|user_id|product_id|platform|order_amount|pay_amount|         order_time|           pay_time|
+----------+-------+----------+--------+------------+----------+-------------------+-------------------+
|SYS1000001|U157213|   P000064|     App|      272.51|    272.51|2018-02-14 12:20:36|2018-02-14 12:31:28|
|SYS1000002|U191121|   P000583|  Wechat|      337.93|       0.0|2018-08-14 09:40:34|2018-08-14 09:45:23|
|SYS1000003|U211918|   P000082|  Wechat|      905.68|    891.23|2018-11-02 20:17:25|2018-11-02 20:31:41|
|SYS1000004|U201322|   P000302|     Web|      786.27|    688.88|2018-11-19 10:36:39|2018-11-19 10:51:14|
|SYS1000005|U120872|   P000290|     App|      550.77|    542.51|2018-12-26 11:19:16|2018-12-26 11:27:09|
+----------+-------+----------+--------+------------+----------+-------------------+-------------------+
only showing top 5 rows
'''

三、SparkSQL——导出DF

import pyspark
from pyspark.sql import SparkSession
import findspark
findspark.init()
spark = SparkSession \
    .builder \
    .appName("test") \
    .master("local[*]") \
    .enableHiveSupport() \
    .getOrCreate()
sc = spark.sparkContext

1.读取csv文件

df = spark.read.csv(
    path="data/iris.csv",
    sep=',',
    inferSchema=True,
    header=True)
df.show(5)
'''
+------------+-----------+------------+-----------+-----+
|sepal_length|sepal_width|petal_length|petal_width|label|
+------------+-----------+------------+-----------+-----+
|         5.1|        3.5|         1.4|        0.2|    0|
|         4.9|        3.0|         1.4|        0.2|    0|
|         4.7|        3.2|         1.3|        0.2|    0|
|         4.6|        3.1|         1.5|        0.2|    0|
|         5.0|        3.6|         1.4|        0.2|    0|
+------------+-----------+------------+-----------+-----+
only showing top 5 rows
'''
  • 保存为csv文件,txt文件,json文件,parquet文件,hive数据表

2.保存为csv文件

# 如果文件已存在则抛出异常
df.write.csv('data/df_csv')
# 会保存到文件夹中,csv文件命名随机

3.保存为txt文件

# 仅支持只有一列的df,且该列数据类型为字符串
# df.write.text('data/df_text')

df.rdd.saveAsTextFile('data/df_text')

4.保存为json文件

df.write.json('data/df_json')

5.保存为parquet文件

# 存储空间小,加载速度快
# 不分区,部分文件夹存储
df.write.parquet('data/df_parquet')

# 分区,分文件夹存储
df.write.partitionBy('label').format('parquet').save('data/df_parquet_par')   # 按照'物种'字段分区存储

6.保存为hive数据表

df.show(5)
'''
+------------+-----------+------------+-----------+-----+
|sepal_length|sepal_width|petal_length|petal_width|label|
+------------+-----------+------------+-----------+-----+
|         5.1|        3.5|         1.4|        0.2|    0|
|         4.9|        3.0|         1.4|        0.2|    0|
|         4.7|        3.2|         1.3|        0.2|    0|
|         4.6|        3.1|         1.5|        0.2|    0|
|         5.0|        3.6|         1.4|        0.2|    0|
+------------+-----------+------------+-----------+-----+
only showing top 5 rows
'''

spark.sql('create database db')   # 创建库

df.write.bucketBy(3, 'label').sortBy('sepal_length').saveAsTable('db.iris')   # 按照label字段分桶,并且按照sepal_length字段排序,保存到db库的iris表中

四、SparkSQL——DF的API交互

1、数据预览

import pyspark
from pyspark.sql import SparkSession
import findspark
findspark.init()
spark = SparkSession \
    .builder \
    .appName("test") \
    .master("local[*]") \
    .enableHiveSupport() \
    .getOrCreate()
sc = spark.sparkContext

1.读取数据

# inferSchema推断数据类型
df = spark.read.csv("data/order.csv", header=True, inferSchema=True)

2.行式预览数据

  • 预览前1行
df.first()
# Row(order_id=10021265709, order_date='2010-10-13', region='华北', province='河北',product_type='办公用品', price=38.94, quantity=6, profit=-213.25)
  • 预览前3行
df.head(3)
'''
[Row(order_id=10021265709, order_date='2010-10-13', region='华北', province='河北', product_type='办公用品', price=38.94, quantity=6, profit=-213.25),
 Row(order_id=10021250753, order_date='2012-02-20', region='华南', province='河南', product_type='办公用品', price=2.08, quantity=2, profit=-4.64),
 Row(order_id=10021257699, order_date='2011-07-15', region='华南', province='广东', product_type='家具产品', price=107.53, quantity=26, profit=1054.82)]
'''

df.show(3)
'''
+-----------+----------+------+--------+------------+------+--------+-------+
|   order_id|order_date|region|province|product_type| price|quantity| profit|
+-----------+----------+------+--------+------------+------+--------+-------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6|-213.25|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|  -4.64|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26|1054.82|
+-----------+----------+------+--------+------------+------+--------+-------+
only showing top 3 rows
'''
  • 预览后3行
df.tail(3)
'''
[Row(order_id=10021249146, order_date='2012-11-15', region='华南', province='广东', product_type='数码电子', price=115.99, quantity=10, profit=-105.7),
 Row(order_id=10021687707, order_date='2009-01-23', region='华南', province='广东', product_type='数码电子', price=73.98, quantity=18, profit=-23.44),
 Row(order_id=10021606289, order_date='2011-05-27', region='华南', province='广东', product_type='办公用品', price=9.71, quantity=20, profit=-113.52)]
'''
  • 预览所有行
df.collect()

3.表式预览数据

df.show(5)   # 默认预览前面20行
'''
+-----------+----------+------+--------+------------+------+--------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|
+-----------+----------+------+--------+------------+------+--------+--------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6| -213.25|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|
+-----------+----------+------+--------+------------+------+--------+--------+
only showing top 5 rows
'''

4.查看架构信息

# 查看字段名称、数据类型、是否可空等,元数据
df.printSchema()
'''
root
 |-- order_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- region: string (nullable = true)
 |-- province: string (nullable = true)
 |-- product_type: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- profit: double (nullable = true)

'''

5.查看字段列表

df.columns
'''
['order_id',
 'order_date',
 'region',
 'province',
 'product_type',
 'price',
 'quantity',
 'profit']
'''

6.查看字段数据类型

df.dtypes
'''
[('order_id', 'bigint'),
 ('order_date', 'string'),
 ('region', 'string'),
 ('province', 'string'),
 ('product_type', 'string'),
 ('price', 'double'),
 ('quantity', 'int'),
 ('profit', 'double')]
'''

7.查看行数

df.count()
# 8567

8.查看文件路径

df.inputFiles()
# ['file:///D:/桌面/PySpark/PySpark/data/order.csv']

2、类SQL的基本操作

  • 对spark的df新增列,删除列,重命名,排序,删除重复值,删除缺失值等操作
df = spark.read.csv("data/order.csv", header=True, inferSchema=True)
df.show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|
+-----------+----------+------+--------+------------+------+--------+--------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6| -213.25|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|
+-----------+----------+------+--------+------------+------+--------+--------+
only showing top 5 rows
'''

1.选取列

# df.select('*').show()  # 读取所有字段
# 读取指定字段
# df.select('order_id','region','province','quantity').show(5)
# df['order_id','region','province','quantity'].show(5)
# df[['order_id','region','province','quantity']].show(5)
# df.select(df.order_id,df.region,df.province,df.quantity).show(5)
df.select(df['order_id'],df['region'],df['province'],df['quantity']).show(5)
'''
+-----------+------+--------+--------+
|   order_id|region|province|quantity|
+-----------+------+--------+--------+
|10021265709|  华北|    河北|       6|
|10021250753|  华南|    河南|       2|
|10021257699|  华南|    广东|      26|
|10021258358|  华北|  内蒙古|      24|
|10021249836|  华北|  内蒙古|      23|
+-----------+------+--------+--------+
only showing top 5 rows
'''

# 正则表达式读取指定字段
df.select(df.colRegex('`order.*`')).show(5)   # 正则表达式前后必须要写 `
'''
+-----------+----------+
|   order_id|order_date|
+-----------+----------+
|10021265709|2010-10-13|
|10021250753|2012-02-20|
|10021257699|2011-07-15|
|10021258358|2011-07-15|
|10021249836|2011-07-15|
+-----------+----------+
only showing top 5 rows
'''

2.新增列

# 金额 = 价格 * 数量
df = df.withColumn('amount',df['price']*df['quantity'])
df.show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|            amount|discount|
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6| -213.25|            233.64|     0.5|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64|              4.16|     0.5|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82|           2795.78|     0.5|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|1701.3600000000001|     0.5|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|            183.77|     0.5|
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
only showing top 5 rows
'''

# 折扣=0.5,该列的所有值为0.5
from pyspark.sql import functions as f
df = df.withColumn('discount',f.lit(0.5))
df.show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|            amount|discount|
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6| -213.25|            233.64|     0.5|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64|              4.16|     0.5|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82|           2795.78|     0.5|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|1701.3600000000001|     0.5|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|            183.77|     0.5|
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
only showing top 5 rows
'''

# 计算折扣金额并预览结果
df.select('amount','discount',df['amount']*df['discount']).show(5)
'''
+------------------+--------+-------------------+
|            amount|discount|(amount * discount)|
+------------------+--------+-------------------+
|            233.64|     0.5|             116.82|
|              4.16|     0.5|               2.08|
|           2795.78|     0.5|            1397.89|
|1701.3600000000001|     0.5|  850.6800000000001|
|            183.77|     0.5|             91.885|
+------------------+--------+-------------------+
only showing top 5 rows
'''

3.删除列

df.show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|            amount|discount|
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6| -213.25|            233.64|     0.5|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64|              4.16|     0.5|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82|           2795.78|     0.5|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|1701.3600000000001|     0.5|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|            183.77|     0.5|
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
only showing top 5 rows
'''

df.drop('region','product_type').show(5)
'''
+-----------+----------+--------+------+--------+--------+------------------+--------+
|   order_id|order_date|province| price|quantity|  profit|            amount|discount|
+-----------+----------+--------+------+--------+--------+------------------+--------+
|10021265709|2010-10-13|    河北| 38.94|       6| -213.25|            233.64|     0.5|
|10021250753|2012-02-20|    河南|  2.08|       2|   -4.64|              4.16|     0.5|
|10021257699|2011-07-15|    广东|107.53|      26| 1054.82|           2795.78|     0.5|
|10021258358|2011-07-15|  内蒙古| 70.89|      24|-1748.56|1701.3600000000001|     0.5|
|10021249836|2011-07-15|  内蒙古|  7.99|      23|  -85.13|            183.77|     0.5|
+-----------+----------+--------+------+--------+--------+------------------+--------+
only showing top 5 rows
'''

4.重命名列

# inplace=True (pandas 的 rename), spark没有该参数
df.withColumnRenamed('product_type','category').show(5)  # 一次只能改一个
'''
+-----------+----------+------+--------+--------+------+--------+--------+------------------+--------+
|   order_id|order_date|region|province|category| price|quantity|  profit|            amount|discount|
+-----------+----------+------+--------+--------+------+--------+--------+------------------+--------+
|10021265709|2010-10-13|  华北|    河北|办公用品| 38.94|       6| -213.25|            233.64|     0.5|
|10021250753|2012-02-20|  华南|    河南|办公用品|  2.08|       2|   -4.64|              4.16|     0.5|
|10021257699|2011-07-15|  华南|    广东|家具产品|107.53|      26| 1054.82|           2795.78|     0.5|
|10021258358|2011-07-15|  华北|  内蒙古|家具产品| 70.89|      24|-1748.56|1701.3600000000001|     0.5|
|10021249836|2011-07-15|  华北|  内蒙古|数码电子|  7.99|      23|  -85.13|            183.77|     0.5|
+-----------+----------+------+--------+--------+------+--------+--------+------------------+--------+
only showing top 5 rows
'''

# 重命名多个字段
df.toDF('订单编号','订单日期','区域','省份','产品类型','价格','数量','利润','金额','折扣').show(5)
'''
+-----------+----------+----+------+--------+------+----+--------+------------------+----+
|   订单编号|  订单日期|区域|  省份|产品类型|  价格|数量|    利润|              金额|折扣|
+-----------+----------+----+------+--------+------+----+--------+------------------+----+
|10021265709|2010-10-13|华北|  河北|办公用品| 38.94|   6| -213.25|            233.64| 0.5|
|10021250753|2012-02-20|华南|  河南|办公用品|  2.08|   2|   -4.64|              4.16| 0.5|
|10021257699|2011-07-15|华南|  广东|家具产品|107.53|  26| 1054.82|           2795.78| 0.5|
|10021258358|2011-07-15|华北|内蒙古|家具产品| 70.89|  24|-1748.56|1701.3600000000001| 0.5|
|10021249836|2011-07-15|华北|内蒙古|数码电子|  7.99|  23|  -85.13|            183.77| 0.5|
+-----------+----------+----+------+--------+------+----+--------+------------------+----+
only showing top 5 rows
'''

5.重命名表

df_new = df

df_new.show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|            amount|discount|
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6| -213.25|            233.64|     0.5|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64|              4.16|     0.5|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82|           2795.78|     0.5|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|1701.3600000000001|     0.5|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|            183.77|     0.5|
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
only showing top 5 rows
'''

6.排序

  • 一列排序
# sort 或 orderBy
df.sort(df['order_date'].desc()).show(5)
df.orderBy(df['region'].asc()).show(5)
'''
+-----------+----------+------+--------+------------+------+--------+------+------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|profit|amount|discount|
+-----------+----------+------+--------+------------+------+--------+------+------+--------+
|10021585147|2012-12-30|  华南|    广东|    办公用品|  7.28|      37|-18.66|269.36|     0.5|
|10021515764|2012-12-30|  华北|  内蒙古|    家具产品| 13.73|      45|-33.47|617.85|     0.5|
|10021338256|2012-12-30|  华东|    浙江|    办公用品|  1.48|      10| -1.29|  14.8|     0.5|
|10021340583|2012-12-30|  东北|    辽宁|    数码电子| 19.98|      31| 27.85|619.38|     0.5|
|10021673269|2012-12-30|  东北|    辽宁|    办公用品|832.81|       1|-745.2|832.81|     0.5|
+-----------+----------+------+--------+------------+------+--------+------+------+--------+
only showing top 5 rows

+-----------+----------+------+--------+------------+------+--------+-------+-----------------+--------+
|   order_id|order_date|region|province|product_type| price|quantity| profit|           amount|discount|
+-----------+----------+------+--------+------------+------+--------+-------+-----------------+--------+
|10021256099|2010-12-28|  东北|    辽宁|    办公用品|  6.54|      33| -55.11|           215.82|     0.5|
|10021268757|2009-06-13|  东北|    吉林|    数码电子| 85.99|      16|-172.55|          1375.84|     0.5|
|10021255969|2010-06-28|  东北|    辽宁|    数码电子|140.99|      47|1680.79|6626.530000000001|     0.5|
|10021253491|2011-07-15|  东北|    辽宁|    数码电子|  8.46|      15|-128.38|            126.9|     0.5|
|10021255756|2009-05-16|  东北|    辽宁|    办公用品|   5.4|      25|-136.25|            135.0|     0.5|
+-----------+----------+------+--------+------------+------+--------+-------+-----------------+--------+
only showing top 5 rows
'''
  • 多列排序
# df.sort(df['region'].asc(),df['order_date'].desc()).show(5)
# df.orderBy(df['region'].asc(),df['order_date'].desc()).show(5)
df.orderBy(['region','order_date'],ascending=[True,False]).show(5)
'''
+-----------+----------+------+--------+------------+------+--------+------+------------------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|profit|            amount|discount|
+-----------+----------+------+--------+------------+------+--------+------+------------------+--------+
|10021673269|2012-12-30|  东北|    辽宁|    办公用品|832.81|       1|-745.2|            832.81|     0.5|
|10021340583|2012-12-30|  东北|    辽宁|    数码电子| 19.98|      31| 27.85|            619.38|     0.5|
|10021280768|2012-12-24|  东北|    吉林|    办公用品|  6.81|       7|-21.38|47.669999999999995|     0.5|
|10021386342|2012-12-21|  东北|    辽宁|    数码电子|145.45|      17| 43.03|2472.6499999999996|     0.5|
|10021421171|2012-12-19|  东北|    辽宁|    办公用品|  6.23|      25|-91.28|            155.75|     0.5|
+-----------+----------+------+--------+------------+------+--------+------+------------------+--------+
only showing top 5 rows
'''

7.替换

df.show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|            amount|discount|
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6| -213.25|            233.64|     0.5|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64|              4.16|     0.5|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82|           2795.78|     0.5|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|1701.3600000000001|     0.5|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|            183.77|     0.5|
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
only showing top 5 rows
'''

dct = {'办公用品':'办公','家具产品':'家具','数码电子':'数码'}
df.replace(to_replace=dct,subset='product_type').show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|            amount|discount|
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
|10021265709|2010-10-13|  华北|    河北|        办公| 38.94|       6| -213.25|            233.64|     0.5|
|10021250753|2012-02-20|  华南|    河南|        办公|  2.08|       2|   -4.64|              4.16|     0.5|
|10021257699|2011-07-15|  华南|    广东|        家具|107.53|      26| 1054.82|           2795.78|     0.5|
|10021258358|2011-07-15|  华北|  内蒙古|        家具| 70.89|      24|-1748.56|1701.3600000000001|     0.5|
|10021249836|2011-07-15|  华北|  内蒙古|        数码|  7.99|      23|  -85.13|            183.77|     0.5|
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
only showing top 5 rows
'''

# 将办公用品 和 家具产品 替换为 办公和家具
lst = ['办公用品','家具产品']
df.replace(to_replace=lst,value='办公和家具',subset='product_type').show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|            amount|discount|
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
|10021265709|2010-10-13|  华北|    河北|  办公和家具| 38.94|       6| -213.25|            233.64|     0.5|
|10021250753|2012-02-20|  华南|    河南|  办公和家具|  2.08|       2|   -4.64|              4.16|     0.5|
|10021257699|2011-07-15|  华南|    广东|  办公和家具|107.53|      26| 1054.82|           2795.78|     0.5|
|10021258358|2011-07-15|  华北|  内蒙古|  办公和家具| 70.89|      24|-1748.56|1701.3600000000001|     0.5|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|            183.77|     0.5|
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
only showing top 5 rows
'''

8.删除空值

# df.na.drop()
# any  或
# 如两列均为空值删除整行 all 与
# df.dropna(how='all',subset=['region','province']).show(5)

# 如两列其中一列为空值,则删除整行
df.dropna(how='any',subset=['region','province']).show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|            amount|discount|
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6| -213.25|            233.64|     0.5|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64|              4.16|     0.5|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82|           2795.78|     0.5|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|1701.3600000000001|     0.5|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|            183.77|     0.5|
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
only showing top 5 rows
'''

9.填充空值

# df.na.fill()
df.fillna(value=0,subset='profit').show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|            amount|discount|
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6| -213.25|            233.64|     0.5|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64|              4.16|     0.5|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82|           2795.78|     0.5|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|1701.3600000000001|     0.5|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|            183.77|     0.5|
+-----------+----------+------+--------+------------+------+--------+--------+------------------+--------+
only showing top 5 rows

'''

10.删除重复值

# drop_duplicates 或者 dropDuplicates
df.drop_duplicates().show(5)  # 根据所有字段,删除重复行
'''
+-----------+----------+------+--------+------------+------+--------+-------+-------+--------+
|   order_id|order_date|region|province|product_type| price|quantity| profit| amount|discount|
+-----------+----------+------+--------+------------+------+--------+-------+-------+--------+
|10021278668|2010-01-26|  华北|    山西|    办公用品|  17.7|      47|-131.27|  831.9|     0.5|
|10021374233|2010-03-28|  东北|    辽宁|    家具产品|209.37|      47|-505.98|9840.39|     0.5|
|10021426866|2009-04-09|  华南|    广西|    数码电子| 40.98|      26|-127.11|1065.48|     0.5|
|10021256790|2010-03-05|  华北|    北京|    办公用品| 30.98|      15| 117.39|  464.7|     0.5|
|10021251867|2011-08-16|  西北|    宁夏|    数码电子| 50.98|      34| -82.16|1733.32|     0.5|
+-----------+----------+------+--------+------------+------+--------+-------+-------+--------+
only showing top 5 rows
'''

# 根据指定字段删除重复行
df.drop_duplicates(subset=['order_id','region']).show(5)
'''
+-----------+----------+------+--------+------------+------+--------+-------+-----------------+--------+
|   order_id|order_date|region|province|product_type| price|quantity| profit|           amount|discount|
+-----------+----------+------+--------+------------+------+--------+-------+-----------------+--------+
|10021248927|2010-08-02|  华南|    广东|    办公用品| 46.89|      12| 148.34|562.6800000000001|     0.5|
|10021248932|2010-10-02|  华北|    河北|    办公用品|  8.04|      23|-117.16|           184.92|     0.5|
|10021250042|2010-12-14|  华南|    广西|    数码电子| 20.95|      11| -68.76|           230.45|     0.5|
|10021251252|2012-08-01|  华南|    海南|    家具产品|  4.95|      34| -59.71|            168.3|     0.5|
|10021252488|2011-02-15|  华北|    山西|    数码电子|119.99|      39| 239.41|          4679.61|     0.5|
+-----------+----------+------+--------+------------+------+--------+-------+-----------------+--------+
only showing top 5 rows
'''

11.转换字段数据类型

  • “价格”转换为整型,“数量”转换为浮点型
from pyspark.sql import functions as f
from pyspark.sql import types as t

df.show(3)
'''
+-----------+----------+------+--------+------------+------+--------+-------+-------+--------+
|   order_id|order_date|region|province|product_type| price|quantity| profit| amount|discount|
+-----------+----------+------+--------+------------+------+--------+-------+-------+--------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6|-213.25| 233.64|     0.5|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|  -4.64|   4.16|     0.5|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26|1054.82|2795.78|     0.5|
+-----------+----------+------+--------+------------+------+--------+-------+-------+--------+
only showing top 3 rows
'''

df.select(
    df['price'].astype(t.IntegerType()),
    df['quantity'].astype(t.FloatType())
).show(5)

# df.select(
#     df['price'].cast(t.IntegerType()),
#     df['quantity'].cast(t.FloatType())
# ).show(5)

# df.select(
#     f.col('price').astype(t.IntegerType()),
#     f.col('quantity').astype(t.FloatType())
# ).show(5)
'''
+-----+--------+
|price|quantity|
+-----+--------+
|   38|     6.0|
|    2|     2.0|
|  107|    26.0|
|   70|    24.0|
|    7|    23.0|
+-----+--------+
only showing top 5 rows
'''

12.转换DataFrame格式

  • 将spark的df转换为pandas的df
pd_df = df.toPandas()
pd_df.head(5)

pd_df.groupby('region').agg({'price':'mean','quantity':'sum'}).reset_index()

  • 将spark的df转换为json
# 如报错则再次运行
df.toJSON().take(1)
'''
['{"order_id":10021265709,"order_date":"2010-10-13","region":"华北","province":"河北","product_type":"办公用品","price":38.94,"quantity":6,"profit":-213.25,"amount":233.64,"discount":0.5}']
'''
  • 将spark的df转换为python迭代器
it = df.toLocalIterator()
# 迭代器可以记住便利的位置,只能向前访问不能后退,直到所有元素访问完毕为止
import time
for row in it:
    print(row)
    time.sleep(3)

3、类SQL的统计操作

df = spark.read.csv("data/order.csv", header=True, inferSchema=True)
df.show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|
+-----------+----------+------+--------+------------+------+--------+--------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6| -213.25|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|
+-----------+----------+------+--------+------------+------+--------+--------+
only showing top 5 rows
'''

1.聚合计算

# 整表聚合计算
df.agg({'price':'max','quantity':'sum'}).show()

# 聚合计算后重命名
df.agg({'price':'max','quantity':'sum'}).withColumnRenamed('sum(quantity)','sum_quantity').withColumnRenamed('max(price)','max_price').show()
'''
+-------------+----------+
|sum(quantity)|max(price)|
+-------------+----------+
|       218830|   6783.02|
+-------------+----------+

+------------+---------+
|sum_quantity|max_price|
+------------+---------+
|      218830|  6783.02|
+------------+---------+
'''

2.描述统计

df.show(3)
'''
+-----------+----------+------+--------+------------+------+--------+-------+
|   order_id|order_date|region|province|product_type| price|quantity| profit|
+-----------+----------+------+--------+------------+------+--------+-------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6|-213.25|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|  -4.64|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26|1054.82|
+-----------+----------+------+--------+------------+------+--------+-------+
only showing top 3 rows
'''

# describe
df.select('price','quantity','profit').describe().show()

# summary
df.select('price','quantity','profit').summary().show()
'''
+-------+------------------+------------------+------------------+
|summary|             price|          quantity|            profit|
+-------+------------------+------------------+------------------+
|  count|              8567|              8567|              8567|
|   mean| 88.39652153612401| 25.54336407143691|180.81604062098822|
| stddev|279.10699118992653|14.488305686423686| 1180.476274473487|
|    min|              0.99|                 1|          -14140.7|
|    max|           6783.02|                50|          27220.69|
+-------+------------------+------------------+------------------+

+-------+------------------+------------------+------------------+
|summary|             price|          quantity|            profit|
+-------+------------------+------------------+------------------+
|  count|              8567|              8567|              8567|
|   mean| 88.39652153612401| 25.54336407143691|180.81604062098822|
| stddev|279.10699118992653|14.488305686423686| 1180.476274473487|
|    min|              0.99|                 1|          -14140.7|
|    25%|              6.48|                13|            -83.54|
|    50%|             20.99|                26|             -1.57|
|    75%|             89.83|                38|            162.11|
|    max|           6783.02|                50|          27220.69|
+-------+------------------+------------------+------------------+
'''

3.频数统计

# 查看某字段超过指定频率的类别
# 频率超过60%的省份和销量
# df.stat.freqItems
df.freqItems(cols=['province','quantity'],support=0.6).show()
'''
+------------------+------------------+
|province_freqItems|quantity_freqItems|
+------------------+------------------+
|            [广东]|              [20]|
+------------------+------------------+
'''

4.分位数

# col 统计字段
# probabilities 概率, 0-1, 必须浮点数,求多个分位数时传递列表或元组
# relativeError 相对误差, 浮点数, 0.0表示精度计算
# df.stat.approxQuantile
df.approxQuantile('price', [0.0, 0.5, 1.0], 0.0)   # 该函数返回列表

# [0.99, 20.99, 6783.02]

5.协方差

  • 求数量与利润的协方差
# 如果x变大,y也变大,说明这两个变量为同向变化关系,则协方差为正数
# 如果x变大,y也变小,说明这两个变量为同向变化关系,则协方差为负数
# 如果x与y的协方差为零,那么两个变量无相关

# x\y\z
# x与y相关性强还是x与z相关性强?
# 如果x、y、z变量的量纲一致,那么可以通过他们之间的协方差来判断相关性强弱
# 如果x、y、z变量的量纲不一致,那么不可以通过他们之间的协方差来判断相关性强弱

pd_df = df.toPandas()

x = pd_df['quantity']
y = pd_df['profit']
n = len(pd_df)  # 样本容量

x_dev = x - x.mean()  # 与均值的偏差
y_dev = y - y.mean()  # 与均值的偏差

# 样本协方差
cov = (x_dev * y_dev).sum() / (n - 1)
cov
# 3286.2303760108925

pd_df[['quantity','profit','price']].cov()

# pandas 方法求两个字段的协方差
pd_df['quantity'].cov(pd_df['profit'])
# 3286.230376010893

# spark 方法求两个字段的协方差
df.cov('quantity','profit')
# 3286.2303760108866

6.相关系数

  • 求数量与利润的相关系数
pd_df = df.toPandas()

x = pd_df['quantity']
y = pd_df['profit']

corr = x.cov(y) / (x.std() * y.std())
corr # 相关系数

corr = x.corr(y)
corr
# 0.19214236955780314

# pandas 方法求相关系数矩阵
pd_df[['quantity','profit','price']].corr()

# spark 方法求相关系数
df.corr('quantity','profit')
# 0.19214236955780292

7.集合运算

df1 = spark.createDataFrame(
    data=[("a", 1),
          ("a", 1),
          ("a", 1),
          ("a", 2),
          ("b", 3),
          ("c", 4)],
    schema=["C1", "C2"])
df2 = spark.createDataFrame(
    data=[("a", 1),
          ("a", 1),
          ("b", 3)],
    schema=["C1", "C2"])
  • 差集
df1.exceptAll(df2).show() # 不去重
df1.subtract(df2).show() # 去重
'''
+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  2|
|  c|  4|
+---+---+

+---+---+
| C1| C2|
+---+---+
|  a|  2|
|  c|  4|
+---+---+
'''
  • 交集
df1.intersectAll(df2).show()   # 不去重
df1.intersect(df2).show()   # 去重
'''
+---+---+
| C1| C2|
+---+---+
|  b|  3|
|  a|  1|
|  a|  1|
+---+---+

+---+---+
| C1| C2|
+---+---+
|  b|  3|
|  a|  1|
+---+---+
'''
  • 并集
# unionALL 等价于 union
df1.unionAll(df2).show()   # 不去重
df1.union(df2).show()   # 不去重
df1.union(df2).distinct().show()  # 去重
'''
+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  a|  1|
|  a|  2|
|  b|  3|
|  c|  4|
|  a|  1|
|  a|  1|
|  b|  3|
+---+---+

+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  a|  1|
|  a|  2|
|  b|  3|
|  c|  4|
|  a|  1|
|  a|  1|
|  b|  3|
+---+---+

+---+---+
| C1| C2|
+---+---+
|  b|  3|
|  a|  1|
|  a|  2|
|  c|  4|
+---+---+
'''

8.随机抽样

df.show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|
+-----------+----------+------+--------+------------+------+--------+--------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6| -213.25|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|
+-----------+----------+------+--------+------------+------+--------+--------+
only showing top 5 rows
'''

# wsamplelacement 是否放回抽样
# fraction 抽样比例
# seed 随机种子
df.sample(fraction=0.5, seed=2).show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|
+-----------+----------+------+--------+------------+------+--------+--------+
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|
|10021253491|2011-07-15|  东北|    辽宁|    数码电子|  8.46|      15| -128.38|
|10021266565|2011-10-22|  东北|    吉林|    办公用品|  9.11|      30|   60.72|
+-----------+----------+------+--------+------------+------+--------+--------+
only showing top 5 rows
'''

9.随机拆分

# 7 3 开 ; 4 6 开
# 按照权重(比例)随即拆分数据,返回列表
df_split = df.randomSplit([0.3,0.7],seed=1)
df_split
'''
[DataFrame[order_id: bigint, order_date: string, region: string, province: string, product_type: string, price: double, quantity: int, profit: double],
 DataFrame[order_id: bigint, order_date: string, region: string, province: string, product_type: string, price: double, quantity: int, profit: double]]
'''

df_split[0].show(5)   # 第一部分数据 占比30%
df_split[1].show(5)   # 第二部分数据  占比70%
'''
+-----------+----------+------+--------+------------+-----+--------+-------+
|   order_id|order_date|region|province|product_type|price|quantity| profit|
+-----------+----------+------+--------+------------+-----+--------+-------+
|10021247897|2011-02-11|  西南|    四川|    家具产品| 8.75|      10| -16.49|
|10021247909|2010-05-19|  华南|    广东|    办公用品| 3.57|      46|-119.35|
|10021248082|2010-07-24|  华北|    山西|    办公用品| 5.98|      48|-193.48|
|10021248110|2012-02-15|  华东|    福建|    办公用品| 3.78|      28|   46.3|
|10021248134|2010-02-17|  华北|    山西|    办公用品| 4.98|      18|  -33.4|
+-----------+----------+------+--------+------------+-----+--------+-------+
only showing top 5 rows

+-----------+----------+------+--------+------------+------+--------+-------+
|   order_id|order_date|region|province|product_type| price|quantity| profit|
+-----------+----------+------+--------+------------+------+--------+-------+
|10021247874|2012-03-10|  华南|    河南|    家具产品|136.98|      14| 761.04|
|10021247876|2010-03-02|  华东|    安徽|    数码电子| 45.99|      29| 218.73|
|10021247974|2010-07-16|  华南|    河南|    数码电子|179.99|      35|1142.08|
|10021248074|2012-04-04|  华北|    山西|    家具产品|  7.28|      49|-197.25|
|10021248111|2009-02-12|  华北|    北京|    办公用品| 12.53|       1| -20.32|
+-----------+----------+------+--------+------------+------+--------+-------+
only showing top 5 rows
'''

df_split[0].count()   # 第一部分数据量
# 2068

df_split[1].count()  # 第二部分数据量
# 5959

4、类SQL的表操作

1.表查询

df = spark.read.csv("data/order.csv", header=True, inferSchema=True)
df.show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|
+-----------+----------+------+--------+------------+------+--------+--------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6| -213.25|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|
+-----------+----------+------+--------+------------+------+--------+--------+
only showing top 5 rows
'''
  • distinct去重
df.distinct().show(5)
df.distinct().count()  # 8567
'''
+-----------+----------+------+--------+------------+-------+--------+-------+
|   order_id|order_date|region|province|product_type|  price|quantity| profit|
+-----------+----------+------+--------+------------+-------+--------+-------+
|10021285407|2009-04-02|  华北|    天津|    办公用品|   47.9|      11| 144.05|
|10021319881|2009-01-07|  华东|    浙江|    数码电子|6783.02|       7| 102.61|
|10021347280|2010-01-09|  西北|    甘肃|    数码电子| 500.98|      32|7176.12|
|10021377672|2010-06-08|  东北|  黑龙江|    办公用品|  15.67|      50|  353.2|
|10021293479|2011-04-20|  华北|    天津|    家具产品| 320.64|      12| 104.48|
+-----------+----------+------+--------+------------+-------+--------+-------+
only showing top 5 rows
'''

df.select('region').distinct().count()
# 6
  • limit限制行数
df.select('order_id','order_date').limit(2).show()
'''
+-----------+----------+
|   order_id|order_date|
+-----------+----------+
|10021265709|2010-10-13|
|10021250753|2012-02-20|
+-----------+----------+
'''
  • where筛选
df.where("product_type='办公用品' and quantity > 10").show(5)
'''
+-----------+----------+------+--------+------------+-----+--------+------+
|   order_id|order_date|region|province|product_type|price|quantity|profit|
+-----------+----------+------+--------+------------+-----+--------+------+
|10021266565|2011-10-22|  东北|    吉林|    办公用品| 9.11|      30| 60.72|
|10021251834|2009-01-19|  华北|    北京|    办公用品| 2.88|      41|  7.57|
|10021255693|2009-06-03|  华南|    广东|    办公用品| 1.68|      28|  0.35|
|10021253900|2010-12-17|  华南|    广东|    办公用品| 1.86|      48|-107.0|
|10021262933|2010-01-28|  西南|    四川|    办公用品| 2.89|      26| 28.24|
+-----------+----------+------+--------+------------+-----+--------+------+
only showing top 5 rows
'''

# pandas 写法
df.where((df['product_type']=='办公用品') & (df['quantity'] > 10)).show(5)
'''
+-----------+----------+------+--------+------------+-----+--------+------+
|   order_id|order_date|region|province|product_type|price|quantity|profit|
+-----------+----------+------+--------+------------+-----+--------+------+
|10021266565|2011-10-22|  东北|    吉林|    办公用品| 9.11|      30| 60.72|
|10021251834|2009-01-19|  华北|    北京|    办公用品| 2.88|      41|  7.57|
|10021255693|2009-06-03|  华南|    广东|    办公用品| 1.68|      28|  0.35|
|10021253900|2010-12-17|  华南|    广东|    办公用品| 1.86|      48|-107.0|
|10021262933|2010-01-28|  西南|    四川|    办公用品| 2.89|      26| 28.24|
+-----------+----------+------+--------+------------+-----+--------+------+
only showing top 5 rows
'''

df.where("region in ('华南','华北') and price > 100").show(5)
'''
+-----------+----------+------+--------+------------+------+--------+-------+
|   order_id|order_date|region|province|product_type| price|quantity| profit|
+-----------+----------+------+--------+------------+------+--------+-------+
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26|1054.82|
|10021263127|2011-10-22|  华南|    湖北|    数码电子|155.99|      14|  48.99|
|10021248211|2011-03-17|  华南|    广西|    数码电子|115.79|      32| 1470.3|
|10021265473|2010-12-17|  华北|    北京|    数码电子|205.99|      46|2057.17|
|10021248412|2009-04-16|  华北|    北京|    数码电子|125.99|      37|1228.89|
+-----------+----------+------+--------+------------+------+--------+-------+
only showing top 5 rows
'''

df.where((df['region'].isin(['华南','华北'])) & (df['price'] > 100)).show(5)
'''
+-----------+----------+------+--------+------------+------+--------+-------+
|   order_id|order_date|region|province|product_type| price|quantity| profit|
+-----------+----------+------+--------+------------+------+--------+-------+
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26|1054.82|
|10021263127|2011-10-22|  华南|    湖北|    数码电子|155.99|      14|  48.99|
|10021248211|2011-03-17|  华南|    广西|    数码电子|115.79|      32| 1470.3|
|10021265473|2010-12-17|  华北|    北京|    数码电子|205.99|      46|2057.17|
|10021248412|2009-04-16|  华北|    北京|    数码电子|125.99|      37|1228.89|
+-----------+----------+------+--------+------------+------+--------+-------+
only showing top 5 rows
'''

# like 通配符筛选
df.where("province like '湖%'").show(5)
'''
+-----------+----------+------+--------+------------+------+--------+-------+
|   order_id|order_date|region|province|product_type| price|quantity| profit|
+-----------+----------+------+--------+------------+------+--------+-------+
|10021263127|2011-10-22|  华南|    湖北|    数码电子|155.99|      14|  48.99|
|10021258830|2012-03-18|  华南|    湖北|    数码电子|155.99|      20| 257.76|
|10021248282|2011-08-20|  华南|    湖南|    数码电子| 65.99|      22|-284.63|
|10021264036|2011-03-04|  华南|    湖北|    办公用品| 34.58|      29| 109.33|
|10021250947|2012-06-20|  华南|    湖北|    家具产品|227.55|      48|1836.81|
+-----------+----------+------+--------+------------+------+--------+-------+
only showing top 5 rows
'''

df.where(df['province'].like('湖%')).show(5)
'''
+-----------+----------+------+--------+------------+------+--------+-------+
|   order_id|order_date|region|province|product_type| price|quantity| profit|
+-----------+----------+------+--------+------------+------+--------+-------+
|10021263127|2011-10-22|  华南|    湖北|    数码电子|155.99|      14|  48.99|
|10021258830|2012-03-18|  华南|    湖北|    数码电子|155.99|      20| 257.76|
|10021248282|2011-08-20|  华南|    湖南|    数码电子| 65.99|      22|-284.63|
|10021264036|2011-03-04|  华南|    湖北|    办公用品| 34.58|      29| 109.33|
|10021250947|2012-06-20|  华南|    湖北|    家具产品|227.55|      48|1836.81|
+-----------+----------+------+--------+------------+------+--------+-------+
only showing top 5 rows

'''

# rlike 正则表达式筛选
df.where("province rlike '湖北|湖南'").show(5)
df.where(df['province'].rlike('湖北|湖南')).show(5)
'''
+-----------+----------+------+--------+------------+------+--------+-------+
|   order_id|order_date|region|province|product_type| price|quantity| profit|
+-----------+----------+------+--------+------------+------+--------+-------+
|10021263127|2011-10-22|  华南|    湖北|    数码电子|155.99|      14|  48.99|
|10021258830|2012-03-18|  华南|    湖北|    数码电子|155.99|      20| 257.76|
|10021248282|2011-08-20|  华南|    湖南|    数码电子| 65.99|      22|-284.63|
|10021264036|2011-03-04|  华南|    湖北|    办公用品| 34.58|      29| 109.33|
|10021250947|2012-06-20|  华南|    湖北|    家具产品|227.55|      48|1836.81|
+-----------+----------+------+--------+------------+------+--------+-------+
only showing top 5 rows

+-----------+----------+------+--------+------------+------+--------+-------+
|   order_id|order_date|region|province|product_type| price|quantity| profit|
+-----------+----------+------+--------+------------+------+--------+-------+
|10021263127|2011-10-22|  华南|    湖北|    数码电子|155.99|      14|  48.99|
|10021258830|2012-03-18|  华南|    湖北|    数码电子|155.99|      20| 257.76|
|10021248282|2011-08-20|  华南|    湖南|    数码电子| 65.99|      22|-284.63|
|10021264036|2011-03-04|  华南|    湖北|    办公用品| 34.58|      29| 109.33|
|10021250947|2012-06-20|  华南|    湖北|    家具产品|227.55|      48|1836.81|
+-----------+----------+------+--------+------------+------+--------+-------+
only showing top 5 rows
'''

# 广播变量筛选
bc = sc.broadcast(['华南','华北'])
df.where(df['region'].isin(bc.value)).show(5)

# 过滤掉 华南 华北
df.where(~df['region'].isin(bc.value)).show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|
+-----------+----------+------+--------+------------+------+--------+--------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6| -213.25|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|
+-----------+----------+------+--------+------------+------+--------+--------+
only showing top 5 rows

+-----------+----------+------+--------+------------+-----+--------+-------+
|   order_id|order_date|region|province|product_type|price|quantity| profit|
+-----------+----------+------+--------+------------+-----+--------+-------+
|10021253491|2011-07-15|  东北|    辽宁|    数码电子| 8.46|      15|-128.38|
|10021266565|2011-10-22|  东北|    吉林|    办公用品| 9.11|      30|  60.72|
|10021269741|2009-06-03|  西北|    甘肃|    家具产品|30.93|      42| 511.69|
|10021262933|2010-01-28|  西南|    四川|    办公用品| 2.89|      26|  28.24|
|10021263989|2012-05-07|  华东|    福建|    办公用品|18.97|      29|  71.75|
+-----------+----------+------+--------+------------+-----+--------+-------+
only showing top 5 rows
'''
  • filter筛选
# filter 等价于 where
df.filter("product_type='办公用品' and quantity > 10").show(5)
'''
+-----------+----------+------+--------+------------+-----+--------+------+
|   order_id|order_date|region|province|product_type|price|quantity|profit|
+-----------+----------+------+--------+------------+-----+--------+------+
|10021266565|2011-10-22|  东北|    吉林|    办公用品| 9.11|      30| 60.72|
|10021251834|2009-01-19|  华北|    北京|    办公用品| 2.88|      41|  7.57|
|10021255693|2009-06-03|  华南|    广东|    办公用品| 1.68|      28|  0.35|
|10021253900|2010-12-17|  华南|    广东|    办公用品| 1.86|      48|-107.0|
|10021262933|2010-01-28|  西南|    四川|    办公用品| 2.89|      26| 28.24|
+-----------+----------+------+--------+------------+-----+--------+------+
only showing top 5 rows
'''

# pandas 写法
df.filter((df['product_type']=='办公用品') & (df['quantity'] > 10)).show(5)
'''
+-----------+----------+------+--------+------------+-----+--------+------+
|   order_id|order_date|region|province|product_type|price|quantity|profit|
+-----------+----------+------+--------+------------+-----+--------+------+
|10021266565|2011-10-22|  东北|    吉林|    办公用品| 9.11|      30| 60.72|
|10021251834|2009-01-19|  华北|    北京|    办公用品| 2.88|      41|  7.57|
|10021255693|2009-06-03|  华南|    广东|    办公用品| 1.68|      28|  0.35|
|10021253900|2010-12-17|  华南|    广东|    办公用品| 1.86|      48|-107.0|
|10021262933|2010-01-28|  西南|    四川|    办公用品| 2.89|      26| 28.24|
+-----------+----------+------+--------+------------+-----+--------+------+
only showing top 5 rows
'''
  • 字段与常量的运算
df.show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|
+-----------+----------+------+--------+------------+------+--------+--------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6| -213.25|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|
+-----------+----------+------+--------+------------+------+--------+--------+
only showing top 5 rows
'''

# alias 对应 SQL 的 as
# alias 等价于 name
df.select('order_id','price',(df['price']*1.2).alias('price_new')).show(5)
'''
+-----------+------+------------------+
|   order_id| price|         price_new|
+-----------+------+------------------+
|10021265709| 38.94|46.727999999999994|
|10021250753|  2.08|             2.496|
|10021257699|107.53|           129.036|
|10021258358| 70.89|            85.068|
|10021249836|  7.99|             9.588|
+-----------+------+------------------+
only showing top 5 rows
'''
  • 字段与字段的运算
df.select('order_id','price',(df['price']*df['quantity']).alias('amount')).show(5)
'''
+-----------+------+------------------+
|   order_id| price|            amount|
+-----------+------+------------------+
|10021265709| 38.94|            233.64|
|10021250753|  2.08|              4.16|
|10021257699|107.53|           2795.78|
|10021258358| 70.89|1701.3600000000001|
|10021249836|  7.99|            183.77|
+-----------+------+------------------+
only showing top 5 rows

'''
  • 条件判断
# when otherwise 类似于 SQL 的 case when
from pyspark.sql import functions as f
df.select(
    'order_id',
    'profit',
    f.when(df['profit'] > 0, '盈利').when(df['profit'] == 0, '持平').otherwise('亏损').name('state')
).show(5)

from pyspark.sql import functions as f
df.select(
    'order_id',
    'profit',
    f.when(df['profit'] > 0, '盈利').when(df['profit'] == 0, '持平').otherwise('亏损').alias('state')
).show(5)
'''
+-----------+--------+-----+
|   order_id|  profit|state|
+-----------+--------+-----+
|10021265709| -213.25| 亏损|
|10021250753|   -4.64| 亏损|
|10021257699| 1054.82| 盈利|
|10021258358|-1748.56| 亏损|
|10021249836|  -85.13| 亏损|
+-----------+--------+-----+
only showing top 5 rows

+-----------+--------+-----+
|   order_id|  profit|state|
+-----------+--------+-----+
|10021265709| -213.25| 亏损|
|10021250753|   -4.64| 亏损|
|10021257699| 1054.82| 盈利|
|10021258358|-1748.56| 亏损|
|10021249836|  -85.13| 亏损|
+-----------+--------+-----+
only showing top 5 rows
'''

2.表连接

  • 员工表
people = spark.read.csv("data/people.csv", header=True, inferSchema=True)
people.show(5)  
'''
+---+------+---+---+
| id|  name|sex|age|
+---+------+---+---+
|  1|孙尚香| 女| 29|
|  2|  曹操| 男| 21|
|  4|司马懿| 男| 21|
|  5|  貂蝉| 女| 28|
|  7|  大乔| 女| 21|
+---+------+---+---+
only showing top 5 rows
'''
  • 薪资表
salary = spark.read.csv("data/salary.csv", header=True, inferSchema=True)
salary.show(5)  
'''
+---+------+-----+------+
| id|  name|level|salary|
+---+------+-----+------+
|  2|  曹操|    1|   15K|
|  3|  荀彧|    2|   10K|
|  4|司马懿|    2|   10K|
|  5|  貂蝉|    3|    5K|
|  6|  孙权|    3|    5K|
+---+------+-----+------+
only showing top 5 rows
'''
  • inner内连接
people.join(salary, on='id', how='inner').show()  # 默认就是内连接
people.join(salary, on=['id','name'], how='inner').show()
'''
+---+------+---+---+------+-----+------+
| id|  name|sex|age|  name|level|salary|
+---+------+---+---+------+-----+------+
|  2|  曹操| 男| 21|  曹操|    1|   15K|
|  4|司马懿| 男| 21|司马懿|    2|   10K|
|  5|  貂蝉| 女| 28|  貂蝉|    3|    5K|
|  7|  大乔| 女| 21|  大乔|    3|    5K|
+---+------+---+---+------+-----+------+

+---+------+---+---+-----+------+
| id|  name|sex|age|level|salary|
+---+------+---+---+-----+------+
|  2|  曹操| 男| 21|    1|   15K|
|  4|司马懿| 男| 21|    2|   10K|
|  5|  貂蝉| 女| 28|    3|    5K|
|  7|  大乔| 女| 21|    3|    5K|
+---+------+---+---+-----+------+
'''
  • left左连接
people.join(salary, on='id', how='left').show()
'''
+---+------+---+---+------+-----+------+
| id|  name|sex|age|  name|level|salary|
+---+------+---+---+------+-----+------+
|  1|孙尚香| 女| 29|  null| null|  null|
|  2|  曹操| 男| 21|  曹操|    1|   15K|
|  4|司马懿| 男| 21|司马懿|    2|   10K|
|  5|  貂蝉| 女| 28|  貂蝉|    3|    5K|
|  7|  大乔| 女| 21|  大乔|    3|    5K|
|  8|诸葛亮| 男| 22|  null| null|  null|
| 10|邢道荣| 男| 29|  null| null|  null|
+---+------+---+---+------+-----+------+
'''
  • right右连接
people.join(salary, on='id', how='right').show()
'''
+---+------+----+----+------+-----+------+
| id|  name| sex| age|  name|level|salary|
+---+------+----+----+------+-----+------+
|  2|  曹操|  男|  21|  曹操|    1|   15K|
|  3|  null|null|null|  荀彧|    2|   10K|
|  4|司马懿|  男|  21|司马懿|    2|   10K|
|  5|  貂蝉|  女|  28|  貂蝉|    3|    5K|
|  6|  null|null|null|  孙权|    3|    5K|
|  7|  大乔|  女|  21|  大乔|    3|    5K|
|  9|  null|null|null|  小乔|    2|   10K|
+---+------+----+----+------+-----+------+
'''
  • outer外连接
people.join(salary, on='id', how='outer').show()
people.join(salary, on='id', how='full').show()
'''
+---+------+----+----+------+-----+------+
| id|  name| sex| age|  name|level|salary|
+---+------+----+----+------+-----+------+
|  1|孙尚香|  女|  29|  null| null|  null|
|  6|  null|null|null|  孙权|    3|    5K|
|  3|  null|null|null|  荀彧|    2|   10K|
|  5|  貂蝉|  女|  28|  貂蝉|    3|    5K|
|  9|  null|null|null|  小乔|    2|   10K|
|  4|司马懿|  男|  21|司马懿|    2|   10K|
|  8|诸葛亮|  男|  22|  null| null|  null|
|  7|  大乔|  女|  21|  大乔|    3|    5K|
| 10|邢道荣|  男|  29|  null| null|  null|
|  2|  曹操|  男|  21|  曹操|    1|   15K|
+---+------+----+----+------+-----+------+

+---+------+----+----+------+-----+------+
| id|  name| sex| age|  name|level|salary|
+---+------+----+----+------+-----+------+
|  1|孙尚香|  女|  29|  null| null|  null|
|  6|  null|null|null|  孙权|    3|    5K|
|  3|  null|null|null|  荀彧|    2|   10K|
|  5|  貂蝉|  女|  28|  貂蝉|    3|    5K|
|  9|  null|null|null|  小乔|    2|   10K|
|  4|司马懿|  男|  21|司马懿|    2|   10K|
|  8|诸葛亮|  男|  22|  null| null|  null|
|  7|  大乔|  女|  21|  大乔|    3|    5K|
| 10|邢道荣|  男|  29|  null| null|  null|
|  2|  曹操|  男|  21|  曹操|    1|   15K|
+---+------+----+----+------+-----+------+
'''
  • cross交叉连接
people.join(salary, how='cross').show()
'''
+---+------+---+---+---+------+-----+------+
| id|  name|sex|age| id|  name|level|salary|
+---+------+---+---+---+------+-----+------+
|  1|孙尚香| 女| 29|  2|  曹操|    1|   15K|
|  1|孙尚香| 女| 29|  3|  荀彧|    2|   10K|
|  1|孙尚香| 女| 29|  4|司马懿|    2|   10K|
|  1|孙尚香| 女| 29|  5|  貂蝉|    3|    5K|
|  1|孙尚香| 女| 29|  6|  孙权|    3|    5K|
|  1|孙尚香| 女| 29|  7|  大乔|    3|    5K|
|  1|孙尚香| 女| 29|  9|  小乔|    2|   10K|
|  2|  曹操| 男| 21|  2|  曹操|    1|   15K|
|  2|  曹操| 男| 21|  3|  荀彧|    2|   10K|
|  2|  曹操| 男| 21|  4|司马懿|    2|   10K|
|  2|  曹操| 男| 21|  5|  貂蝉|    3|    5K|
|  2|  曹操| 男| 21|  6|  孙权|    3|    5K|
|  2|  曹操| 男| 21|  7|  大乔|    3|    5K|
|  2|  曹操| 男| 21|  9|  小乔|    2|   10K|
|  4|司马懿| 男| 21|  2|  曹操|    1|   15K|
|  4|司马懿| 男| 21|  3|  荀彧|    2|   10K|
|  4|司马懿| 男| 21|  4|司马懿|    2|   10K|
|  4|司马懿| 男| 21|  5|  貂蝉|    3|    5K|
|  4|司马懿| 男| 21|  6|  孙权|    3|    5K|
|  4|司马懿| 男| 21|  7|  大乔|    3|    5K|
+---+------+---+---+---+------+-----+------+
only showing top 20 rows
'''

people.crossJoin(salary).show()
'''
+---+------+---+---+---+------+-----+------+
| id|  name|sex|age| id|  name|level|salary|
+---+------+---+---+---+------+-----+------+
|  1|孙尚香| 女| 29|  2|  曹操|    1|   15K|
|  1|孙尚香| 女| 29|  3|  荀彧|    2|   10K|
|  1|孙尚香| 女| 29|  4|司马懿|    2|   10K|
|  1|孙尚香| 女| 29|  5|  貂蝉|    3|    5K|
|  1|孙尚香| 女| 29|  6|  孙权|    3|    5K|
|  1|孙尚香| 女| 29|  7|  大乔|    3|    5K|
|  1|孙尚香| 女| 29|  9|  小乔|    2|   10K|
|  2|  曹操| 男| 21|  2|  曹操|    1|   15K|
|  2|  曹操| 男| 21|  3|  荀彧|    2|   10K|
|  2|  曹操| 男| 21|  4|司马懿|    2|   10K|
|  2|  曹操| 男| 21|  5|  貂蝉|    3|    5K|
|  2|  曹操| 男| 21|  6|  孙权|    3|    5K|
|  2|  曹操| 男| 21|  7|  大乔|    3|    5K|
|  2|  曹操| 男| 21|  9|  小乔|    2|   10K|
|  4|司马懿| 男| 21|  2|  曹操|    1|   15K|
|  4|司马懿| 男| 21|  3|  荀彧|    2|   10K|
|  4|司马懿| 男| 21|  4|司马懿|    2|   10K|
|  4|司马懿| 男| 21|  5|  貂蝉|    3|    5K|
|  4|司马懿| 男| 21|  6|  孙权|    3|    5K|
|  4|司马懿| 男| 21|  7|  大乔|    3|    5K|
+---+------+---+---+---+------+-----+------+
only showing top 20 rows

'''
  • semi左半连接
# 类似于内连接,但区别在于只返回做表字段不返回右表字段
# 相当于按链接条件筛选左表的数据
people.join(salary, on='id', how='semi').show()
people.join(salary, on='id', how='inner').show()
'''
+---+------+---+---+
| id|  name|sex|age|
+---+------+---+---+
|  2|  曹操| 男| 21|
|  4|司马懿| 男| 21|
|  5|  貂蝉| 女| 28|
|  7|  大乔| 女| 21|
+---+------+---+---+

+---+------+---+---+------+-----+------+
| id|  name|sex|age|  name|level|salary|
+---+------+---+---+------+-----+------+
|  2|  曹操| 男| 21|  曹操|    1|   15K|
|  4|司马懿| 男| 21|司马懿|    2|   10K|
|  5|  貂蝉| 女| 28|  貂蝉|    3|    5K|
|  7|  大乔| 女| 21|  大乔|    3|    5K|
+---+------+---+---+------+-----+------+
'''
  • anti左反连接
# 返回左表存在而右表不存在的数据
# 只返回左表字段不返回右表字段
people.join(salary, on='id', how='anti').show()
people.join(salary, on='id', how='full').show()
'''
+---+------+---+---+
| id|  name|sex|age|
+---+------+---+---+
|  1|孙尚香| 女| 29|
|  8|诸葛亮| 男| 22|
| 10|邢道荣| 男| 29|
+---+------+---+---+

+---+------+----+----+------+-----+------+
| id|  name| sex| age|  name|level|salary|
+---+------+----+----+------+-----+------+
|  1|孙尚香|  女|  29|  null| null|  null|
|  6|  null|null|null|  孙权|    3|    5K|
|  3|  null|null|null|  荀彧|    2|   10K|
|  5|  貂蝉|  女|  28|  貂蝉|    3|    5K|
|  9|  null|null|null|  小乔|    2|   10K|
|  4|司马懿|  男|  21|司马懿|    2|   10K|
|  8|诸葛亮|  男|  22|  null| null|  null|
|  7|  大乔|  女|  21|  大乔|    3|    5K|
| 10|邢道荣|  男|  29|  null| null|  null|
|  2|  曹操|  男|  21|  曹操|    1|   15K|
+---+------+----+----+------+-----+------+
'''
  • 复杂连接
salary_new = salary.withColumnRenamed('id','number')

# 表关联键的名称不一致,且为多个关联键,连接条件包含不等值判断
people.join(
    salary_new
    , on=(people['id']==salary_new['number']) & (people['name']==salary_new['name']) & (people['age'] >= 25)
).show()
'''
+---+----+---+---+------+----+-----+------+
| id|name|sex|age|number|name|level|salary|
+---+----+---+---+------+----+-----+------+
|  5|貂蝉| 女| 28|     5|貂蝉|    3|    5K|
+---+----+---+---+------+----+-----+------+
'''
  • 纵向合并(并集)
# union 等价于 unionAll
people.union(people).show()   # 不去重
people.union(people).distinct().show()  # 去重
'''
+---+------+---+---+
| id|  name|sex|age|
+---+------+---+---+
|  1|孙尚香| 女| 29|
|  2|  曹操| 男| 21|
|  4|司马懿| 男| 21|
|  5|  貂蝉| 女| 28|
|  7|  大乔| 女| 21|
|  8|诸葛亮| 男| 22|
| 10|邢道荣| 男| 29|
|  1|孙尚香| 女| 29|
|  2|  曹操| 男| 21|
|  4|司马懿| 男| 21|
|  5|  貂蝉| 女| 28|
|  7|  大乔| 女| 21|
|  8|诸葛亮| 男| 22|
| 10|邢道荣| 男| 29|
+---+------+---+---+

+---+------+---+---+
| id|  name|sex|age|
+---+------+---+---+
| 10|邢道荣| 男| 29|
|  1|孙尚香| 女| 29|
|  4|司马懿| 男| 21|
|  7|  大乔| 女| 21|
|  5|  貂蝉| 女| 28|
|  2|  曹操| 男| 21|
|  8|诸葛亮| 男| 22|
+---+------+---+---+
'''
# unionByName, 按对应字段纵向合并
people_new = people.select('age','sex','name','id')
people_new.show()
'''
+---+---+------+---+
|age|sex|  name| id|
+---+---+------+---+
| 29| 女|孙尚香|  1|
| 21| 男|  曹操|  2|
| 21| 男|司马懿|  4|
| 28| 女|  貂蝉|  5|
| 21| 女|  大乔|  7|
| 22| 男|诸葛亮|  8|
| 29| 男|邢道荣| 10|
+---+---+------+---+
'''

people.unionByName(people_new).show()
'''
+---+------+---+---+
| id|  name|sex|age|
+---+------+---+---+
|  1|孙尚香| 女| 29|
|  2|  曹操| 男| 21|
|  4|司马懿| 男| 21|
|  5|  貂蝉| 女| 28|
|  7|  大乔| 女| 21|
|  8|诸葛亮| 男| 22|
| 10|邢道荣| 男| 29|
|  1|孙尚香| 女| 29|
|  2|  曹操| 男| 21|
|  4|司马懿| 男| 21|
|  5|  貂蝉| 女| 28|
|  7|  大乔| 女| 21|
|  8|诸葛亮| 男| 22|
| 10|邢道荣| 男| 29|
+---+------+---+---+
'''

3.分组聚合

df = spark.read.csv("data/order.csv", header=True, inferSchema=True)
df.show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|
+-----------+----------+------+--------+------------+------+--------+--------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6| -213.25|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|
+-----------+----------+------+--------+------------+------+--------+--------+
only showing top 5 rows
'''
  • groupBy
  • 以区域分组,对数量求和
df.groupBy('region').sum('quantity').show()
df.groupBy('region').agg({'quantity':'sum'}).show()
'''
+------+-------------+
|region|sum(quantity)|
+------+-------------+
|  华东|        38872|
|  西北|        16958|
|  华南|        82102|
|  华北|        42515|
|  东北|        31951|
|  西南|         6432|
+------+-------------+

+------+-------------+
|region|sum(quantity)|
+------+-------------+
|  华东|        38872|
|  西北|        16958|
|  华南|        82102|
|  华北|        42515|
|  东北|        31951|
|  西南|         6432|
+------+-------------+
'''

from pyspark.sql import functions as f
df.groupBy('region').agg(f.sum('quantity').name('quantity_sum')).show()
'''
+------+------------+
|region|quantity_sum|
+------+------------+
|  华东|       38872|
|  西北|       16958|
|  华南|       82102|
|  华北|       42515|
|  东北|       31951|
|  西南|        6432|
+------+------------+
'''
  • groupBy+agg
  • 以区域分组,统计单价的均值,合并产品类型
# collect_list 不去重;collect_set 去重
# mean 等价于 avg
from pyspark.sql import functions as f
df.groupBy('region').agg(
#     f.avg('price').name('price_avg')
    f.mean('price').name('price_avg')
    , f.collect_set('product_type').alias('product_set')
).show()

# 上面的等价写法
df.groupBy('region').agg(
    f.expr('avg(price) as price_avg')
    , f.expr('collect_set(product_type) as product_set')
).show()
'''
+------+-----------------+------------------------------+
|region|        price_avg|                   product_set|
+------+-----------------+------------------------------+
|  华东|98.88595469255712|[数码电子, 办公用品, 家具产品]|
|  西北|92.77338368580075|[数码电子, 办公用品, 家具产品]|
|  华南|86.23637962962982|[数码电子, 办公用品, 家具产品]|
|  华北|92.05539156626534|[数码电子, 办公用品, 家具产品]|
|  东北| 73.0615702479338|[数码电子, 办公用品, 家具产品]|
|  西南|89.90359999999998|[数码电子, 办公用品, 家具产品]|
+------+-----------------+------------------------------+

+------+-----------------+------------------------------+
|region|        price_avg|                   product_set|
+------+-----------------+------------------------------+
|  华东|98.88595469255712|[数码电子, 办公用品, 家具产品]|
|  西北|92.77338368580075|[数码电子, 办公用品, 家具产品]|
|  华南|86.23637962962982|[数码电子, 办公用品, 家具产品]|
|  华北|92.05539156626534|[数码电子, 办公用品, 家具产品]|
|  东北| 73.0615702479338|[数码电子, 办公用品, 家具产品]|
|  西南|89.90359999999998|[数码电子, 办公用品, 家具产品]|
+------+-----------------+------------------------------+
'''
  • groupBy+agg
  • 以区域和省份分组,统计销售金额(单价*数量)
from pyspark.sql import functions as f
df.withColumn('amount', df['price']*df['quantity']).groupBy('region','province').agg(f.sum('amount').name('amount')).show()
'''
+------+--------+------------------+
|region|province|            amount|
+------+--------+------------------+
|  华北|    河北|365382.67999999993|
|  华南|    河南| 679270.2199999999|
|  西北|    甘肃|507529.73000000033|
|  东北|    吉林| 554975.2899999995|
|  华南|    广东|        2499042.95|
|  西北|    新疆|         120412.68|
|  华北|    北京|         650718.85|
|  华东|    浙江|1280734.0600000003|
|  华北|    山西| 786083.3999999999|
|  西南|    四川|200752.48999999993|
|  西北|    青海|31500.249999999993|
|  华北|    天津| 511124.1199999998|
|  西北|    陕西| 315693.5299999999|
|  华北|  内蒙古| 860658.1999999998|
|  华东|    福建|171204.19000000015|
|  华南|    湖南|269664.09999999986|
|  华东|    安徽| 585314.3999999994|
|  西南|    重庆|          63724.83|
|  华东|    山东|260602.88000000006|
|  西南|    贵州|102337.31000000001|
+------+--------+------------------+
only showing top 20 rows
'''
  • groupBy+pivot
  • 数据透视表,行:区域,列:产品类型,值:数量,计算类型:求和
df.groupBy('region').pivot('product_type').sum('quantity').show()
'''
+------+--------+--------+--------+
|region|办公用品|家具产品|数码电子|
+------+--------+--------+--------+
|  华东|   20525|    8775|    9572|
|  西北|    9796|    3123|    4039|
|  华南|   45635|   16457|   20010|
|  华北|   24213|    8333|    9969|
|  东北|   17286|    6880|    7785|
|  西南|    3154|    1537|    1741|
+------+--------+--------+--------+
'''
  • crosstable
  • 交叉表统计频数,行:区域,列:产品类型
df.crosstab('region','product_type').show()
'''
+-------------------+--------+--------+--------+
|region_product_type|办公用品|家具产品|数码电子|
+-------------------+--------+--------+--------+
|               华东|     832|     330|     383|
|               华南|    1775|     660|     805|
|               华北|     939|     327|     394|
|               东北|     647|     266|     297|
|               西南|     127|      56|      67|
|               西北|     373|     130|     159|
+-------------------+--------+--------+--------+
'''
  • 窗口函数
  • 提取各产品类型首次交易的订单信息
from pyspark.sql import Window
from pyspark.sql import functions as f
# row_number() over()
window = Window.partitionBy('product_type').orderBy(df['order_date'].asc()).rowsBetween(Window.unboundedPreceding,Window.currentRow)
# Window.partitionBy   分区(分组)字段
# Window.orderBy      排序字段
# Window.rowsBetween     按行的窗口范围
# Window.rangeBetween    按数值的窗口范围
# Window.unboundedFollowing    尾行
# Window.unboundedPreceding    首行
# Window.currentRow    当前行
# Window.rowsBetween(-3, 3)   当前行的前三行(-3)与后三行(3)

df.select('order_id','order_date','product_type').withColumn('rn', f.row_number().over(window)).where('rn=1').show()

# 上面的等价写法
df.selectExpr('order_id','order_date','product_type','row_number() over(partition by product_type order by order_date asc) as rn').where('rn=1').show()
'''
+-----------+----------+------------+---+
|   order_id|order_date|product_type| rn|
+-----------+----------+------------+---+
|10021269291|2009-01-03|    数码电子|  1|
|10021350632|2009-01-01|    办公用品|  1|
|10021454566|2009-01-02|    家具产品|  1|
+-----------+----------+------------+---+

+-----------+----------+------------+---+
|   order_id|order_date|product_type| rn|
+-----------+----------+------------+---+
|10021269291|2009-01-03|    数码电子|  1|
|10021350632|2009-01-01|    办公用品|  1|
|10021454566|2009-01-02|    家具产品|  1|
+-----------+----------+------------+---+
'''
  • 增强聚合
  • 以区域和产品类型分组对数量求和
# rollup  从左往右按层次分类聚合,无产品类型的小计
df.rollup('region','product_type').sum('quantity').orderBy('region','product_type').show()

# cube
df.cube('region','product_type').sum('quantity').orderBy('region','product_type').show()
'''
+------+------------+-------------+
|region|product_type|sum(quantity)|
+------+------------+-------------+
|  null|        null|       218830|
|  东北|        null|        31951|
|  东北|    办公用品|        17286|
|  东北|    家具产品|         6880|
|  东北|    数码电子|         7785|
|  华东|        null|        38872|
|  华东|    办公用品|        20525|
|  华东|    家具产品|         8775|
|  华东|    数码电子|         9572|
|  华北|        null|        42515|
|  华北|    办公用品|        24213|
|  华北|    家具产品|         8333|
|  华北|    数码电子|         9969|
|  华南|        null|        82102|
|  华南|    办公用品|        45635|
|  华南|    家具产品|        16457|
|  华南|    数码电子|        20010|
|  西北|        null|        16958|
|  西北|    办公用品|         9796|
|  西北|    家具产品|         3123|
+------+------------+-------------+
only showing top 20 rows

+------+------------+-------------+
|region|product_type|sum(quantity)|
+------+------------+-------------+
|  null|        null|       218830|
|  null|    办公用品|       120609|
|  null|    家具产品|        45105|
|  null|    数码电子|        53116|
|  东北|        null|        31951|
|  东北|    办公用品|        17286|
|  东北|    家具产品|         6880|
|  东北|    数码电子|         7785|
|  华东|        null|        38872|
|  华东|    办公用品|        20525|
|  华东|    家具产品|         8775|
|  华东|    数码电子|         9572|
|  华北|        null|        42515|
|  华北|    办公用品|        24213|
|  华北|    家具产品|         8333|
|  华北|    数码电子|         9969|
|  华南|        null|        82102|
|  华南|    办公用品|        45635|
|  华南|    家具产品|        16457|
|  华南|    数码电子|        20010|
+------+------------+-------------+
only showing top 20 rows
'''

4.分组抽样

  • “办公用品”抽取10%,“家具产品”抽取20%,“数码电子”抽取30%
dct = {'办公用品':0.1,'家具产品':0.2,'数码电子':0.3}
sample = df.sampleBy(col='product_type',fractions=dct,seed=1)
df.groupBy('product_type').count().show()  # 抽样前
sample.groupBy('product_type').count().show()  # 抽样后
'''
+------------+-----+
|product_type|count|
+------------+-----+
|    数码电子| 2105|
|    办公用品| 4693|
|    家具产品| 1769|
+------------+-----+

+------------+-----+
|product_type|count|
+------------+-----+
|    数码电子|  656|
|    办公用品|  480|
|    家具产品|  361|
+------------+-----+
'''

5.调用自定义函数

  • 求“订单日期”的距今天数
  • selectExpr()调用自定义函数
from pyspark.sql import functions as f
from pyspark.sql import types as t
import datetime

def get_days(date:str) -> int:
    end_date = datetime.datetime.now()
    start_date = datetime.datetime.strptime(date,'%Y-%m-%d')
    days = (end_date - start_date).days
    return days

spark.udf.register('get_days',get_days,t.IntegerType())  # 将函数注册到spark中

df.selectExpr(
    'order_id'
    , 'order_date'
    , 'get_days(order_date) as get_days'
).show(5)
'''
+-----------+----------+--------+
|   order_id|order_date|get_days|
+-----------+----------+--------+
|10021265709|2010-10-13|    4389|
|10021250753|2012-02-20|    3894|
|10021257699|2011-07-15|    4114|
|10021258358|2011-07-15|    4114|
|10021249836|2011-07-15|    4114|
+-----------+----------+--------+
only showing top 5 rows
'''

df.selectExpr(
    'order_id'
    , 'order_date'
    , 'datediff(current_date(),order_date) as datediff'
).show(5)
'''
+-----------+----------+--------+
|   order_id|order_date|datediff|
+-----------+----------+--------+
|10021265709|2010-10-13|    4389|
|10021250753|2012-02-20|    3894|
|10021257699|2011-07-15|    4114|
|10021258358|2011-07-15|    4114|
|10021249836|2011-07-15|    4114|
+-----------+----------+--------+
only showing top 5 rows
'''
  • 筛选“区域”为华南或华北,且“价格”大于100的数据,并按“价格”降序
  • transform()调用自定义函数
def filter_sort(df):
    # spark语法
    df1 = df.where("region in ('华北','华南') and price > 100")
    df2 = df1.orderBy(df1['price'].desc())
    return df2

df.transform(filter_sort).show(5)

'''
+-----------+----------+------+--------+------------+-------+--------+--------+
|   order_id|order_date|region|province|product_type|  price|quantity|  profit|
+-----------+----------+------+--------+------------+-------+--------+--------+
|10021415235|2012-05-21|  华南|    广东|    数码电子|6783.02|       8| 3852.19|
|10021396651|2009-03-21|  华北|    山西|    数码电子|6783.02|      13|27220.69|
|10021249830|2011-11-27|  华北|  内蒙古|    数码电子|6783.02|       3|-11984.4|
|10021263833|2009-07-28|  华北|    北京|    数码电子|3502.14|       4| -6923.6|
|10021349573|2010-01-18|  华南|    湖南|    数码电子|3502.14|       3|-8389.47|
+-----------+----------+------+--------+------------+-------+--------+--------+
only showing top 5 rows
'''

6.调用Pandas函数

  • 求“价格”与其均值的偏差
  • pandas_udf()调用Pandas函数
  • User defined function:用户自定义函数
import pandas as pd
from pyspark.sql import types as t
from pyspark.sql.functions import pandas_udf

# pandas_udf  装饰器函数

@pandas_udf(t.FloatType())  # 必须定义函数返回值的数据类型(spark)
def dev_udf(s:pd.Series) -> pd.Series:
    return s - s.mean()
    

df.withColumn('dev', dev_udf('price')).show(5)

'''
+-----------+----------+------+--------+------------+------+--------+--------+----------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|       dev|
+-----------+----------+------+--------+------------+------+--------+--------+----------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6| -213.25| -49.45652|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64| -86.31652|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82| 19.133478|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|-17.506521|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|-80.406525|
+-----------+----------+------+--------+------------+------+--------+--------+----------+
only showing top 5 rows
'''
  • 筛选“区域”为东北且“利润”大于零的数据
  • mapInPandas()调用Pandas函数
def filter_func(iterator):
    for pr_df in iterator:
        # 含 yield 关键字的函数称为生成器
        # 生成器函数的返回值为迭代器
        yield pd_df.query(' region == "东北" and profit > 0 ')
    
# schema 为函数返回的 df 的架构信息 
# schema = '字段1 数据类型,字段2 数据类型'
df.mapInPandas(filter_func,df.schema).show(5)
'''
+-----------+----------+------+--------+------------+------+--------+-------+
|   order_id|order_date|region|province|product_type| price|quantity| profit|
+-----------+----------+------+--------+------------+------+--------+-------+
|10021266565|2011-10-22|  东北|    吉林|    办公用品|  9.11|      30|  60.72|
|10021255969|2010-06-28|  东北|    辽宁|    数码电子|140.99|      47|1680.79|
|10021261550|2012-01-12|  东北|    辽宁|    办公用品|   9.9|      43| 175.54|
|10021265879|2011-06-17|  东北|  黑龙江|    办公用品| 42.76|      22|  127.7|
|10021256317|2012-11-30|  东北|    吉林|    家具产品| 14.34|      41| 163.81|
+-----------+----------+------+--------+------------+------+--------+-------+
only showing top 5 rows
'''

7.设置缓存

# 设置缓存: 当某个 df 经常被调用时
df.cache()

# 数据处理过程
# ......

# 释放缓存:当某个 df 已经不需要再调用时
df.unpersist()

# 查看是否设置缓存
df.is_cached

五、SparkSQL——DF的SQL交互

import pyspark
from pyspark.sql import SparkSession
import findspark
findspark.init()
spark = SparkSession \
    .builder \
    .appName("test") \
    .master("local[*]") \
    .enableHiveSupport() \
    .getOrCreate()
sc = spark.sparkContext

1.读取数据

df = spark.read.csv("data/order.csv", header=True, inferSchema=True)
df.show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|
+-----------+----------+------+--------+------------+------+--------+--------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6| -213.25|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|
+-----------+----------+------+--------+------------+------+--------+--------+
only showing top 5 rows
'''

2.创建临时视图

2.1.创建局部的临时视图

# 当前 SparkSession(Spark会话) 有效
df.createOrReplaceTempView('order')

spark.sql('select * from order').show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|
+-----------+----------+------+--------+------------+------+--------+--------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6| -213.25|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|
+-----------+----------+------+--------+------------+------+--------+--------+
only showing top 5 rows
'''

# 新的 SparkSession(Spark会话) 不可查询,会报错
spark.newSession().sql('select * from order').show(5)

2.2.创建全局的临时视图

# 整个 Spark 应用程序(所有spark会话)有效
df.createOrReplaceGlobalTempView('global_order')

spark.sql('select * from global_temp.global_order').show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|
+-----------+----------+------+--------+------------+------+--------+--------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6| -213.25|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|
+-----------+----------+------+--------+------------+------+--------+--------+
only showing top 5 rows
'''

# 新的 SparkSession(Spark会话) 可查询,不会报错
spark.newSession().sql('select * from global_temp.global_order').show(5)
'''
+-----------+----------+------+--------+------------+------+--------+--------+
|   order_id|order_date|region|province|product_type| price|quantity|  profit|
+-----------+----------+------+--------+------------+------+--------+--------+
|10021265709|2010-10-13|  华北|    河北|    办公用品| 38.94|       6| -213.25|
|10021250753|2012-02-20|  华南|    河南|    办公用品|  2.08|       2|   -4.64|
|10021257699|2011-07-15|  华南|    广东|    家具产品|107.53|      26| 1054.82|
|10021258358|2011-07-15|  华北|  内蒙古|    家具产品| 70.89|      24|-1748.56|
|10021249836|2011-07-15|  华北|  内蒙古|    数码电子|  7.99|      23|  -85.13|
+-----------+----------+------+--------+------------+------+--------+--------+
only showing top 5 rows
'''

3.对Hive表增删改查等

3.1.创建数据库

spark.sql('create database if not exists test')

3.2.删除数据表

# 建表
spark.sql('create table if not exists test.tmp_table(id int, name string)')

# 删除表
spark.sql('drop table if exists  test.tmp_table')

3.3.创建内部表

  • 读取本地数据
df = spark.read.csv('data/student.csv', inferSchema=True).toDF('id','name','birth','sex','class')
df.show(5)
'''
+---+----+----------+---+-----+
| id|name|     birth|sex|class|
+---+----+----------+---+-----+
|  1|赵雷|1990-01-01| 男|    1|
|  2|钱电|1990-12-21| 男|    1|
|  3|孙风|1990-12-20| 男|    1|
|  6|吴兰|1991-01-01| 女|    1|
|  7|郑竹|1991-01-01| 女|    1|
+---+----+----------+---+-----+
only showing top 5 rows
'''
  • 创建内部表
sqlquery='''
create table if not exists test.student_v1(
    id int,
    name string,
    birth string,
    sex string,
    class int
)
row format delimited
fields terminated by ","
'''
spark.sql(sqlquery)

3.4.写入内部表

  • 先执行建表语句再插入数据
# 追加数据
df.write.mode('append').format('hive').saveAsTable('test.student_v1')

spark.sql('select * from test.student_v1').show(5)
'''
+---+----+----------+---+-----+
| id|name|     birth|sex|class|
+---+----+----------+---+-----+
|  1|赵雷|1990-01-01| 男|    1|
|  2|钱电|1990-12-21| 男|    1|
|  3|孙风|1990-12-20| 男|    1|
|  6|吴兰|1991-01-01| 女|    1|
|  7|郑竹|1991-01-01| 女|    1|
+---+----+----------+---+-----+
only showing top 5 rows
'''

# 覆盖数据
df.write.mode('overwrite').format('hive').saveAsTable('test.student_v1')
  • 直接插入数据而不需要执行建表语句
# 如果表不存在则新建,如果表存在则报错
df.writeTo('test.student_v2').create()

spark.sql('select * from test.student_v2').show(5)
'''
+---+----+----------+---+-----+
| id|name|     birth|sex|class|
+---+----+----------+---+-----+
|  1|赵雷|1990-01-01| 男|    1|
|  2|钱电|1990-12-21| 男|    1|
|  3|孙风|1990-12-20| 男|    1|
|  6|吴兰|1991-01-01| 女|    1|
|  7|郑竹|1991-01-01| 女|    1|
+---+----+----------+---+-----+
only showing top 5 rows
'''

3.2.创建分区表

sqlquery = """
create table if not exists test.student_p(
id int,
name string,
birth string,
sex string)
partitioned by (class int)
row format delimited 
fields terminated by ','
"""
spark.sql(sqlquery)

3.3.动态写入分区表

# 读取本地数据
df = spark.read.csv('data/student.csv', inferSchema=True) \
    .toDF('id', 'name', 'birth', 'sex', 'class')
df.show(1)
'''
+---+----+----------+---+-----+
| id|name|     birth|sex|class|
+---+----+----------+---+-----+
|  1|赵雷|1990-01-01| 男|    1|
+---+----+----------+---+-----+
only showing top 1 row
'''

# 设置动态分区模式, 非严格模式
spark.conf.set('hive.exec.dynamic.partition.mode', 'nonstrict')

df.write.mode('overwrite').format('hive') \
    .partitionBy('class').saveAsTable('test.student_p')

spark.sql('select * from test.student_p').show(1)
'''
+---+----+----------+---+-----+
| id|name|     birth|sex|class|
+---+----+----------+---+-----+
|  1|赵雷|1990-01-01| 男|    1|
+---+----+----------+---+-----+
only showing top 1 row
'''

3.4.静态写入分区表

df_one = spark.createDataFrame(
    data=[(66, '大山', '2000-01-01', '男', 1)],
    schema=['id', 'name', 'birth', 'sex', 'class'])
df_one.show()
'''
+---+----+----------+---+-----+
| id|name|     birth|sex|class|
+---+----+----------+---+-----+
| 66|大山|2000-01-01| 男|    1|
+---+----+----------+---+-----+
'''

# 创建局部临时视图
df_one.createOrReplaceTempView('df_one')

sqlquery = """
insert into table test.student_p partition(class=1)
select id, name, birth, sex from df_one
"""
spark.sql(sqlquery)

spark.sql('select * from test.student_p').show()
'''
+---+----+----------+---+-----+
| id|name|     birth|sex|class|
+---+----+----------+---+-----+
|  1|赵雷|1990-01-01| 男|    1|
|  2|钱电|1990-12-21| 男|    1|
|  3|孙风|1990-12-20| 男|    1|
|  6|吴兰|1991-01-01| 女|    1|
|  7|郑竹|1991-01-01| 女|    1|
| 13|孙七|1992-06-01| 女|    1|
| 66|大山|2000-01-01| 男|    1|
|  4|李云|1990-12-06| 男|    2|
|  9|张三|1992-12-20| 女|    2|
|  5|周梅|1991-12-01| 女|    2|
| 12|赵六|1990-06-13| 女|    2|
| 17|陈三|1991-10-10| 男|    2|
| 14|郑双|1993-06-01| 女|    3|
| 15|王一|1992-05-01| 男|    3|
| 16|冯二|1990-01-02| 男|    3|
| 10|李四|1992-12-25| 女|    3|
| 11|李四|1991-06-06| 女|    3|
|  8|梅梅|1992-06-07| 女|    3|
+---+----+----------+---+-----+
'''
标签: spark 大数据

本文转载自: https://blog.csdn.net/qq_52421831/article/details/127383624
版权归原作者 呆子不呆X 所有, 如有侵权,请联系我们删除。

“PySpark基础 —— SparkSQL”的评论:

还没有评论