0


Spark SQL之DataFrame,df对象的创建与使用

  • 学习:知识的初次邂逅
  • 复习:知识的温故知新
  • 练习:知识的实践应用

经历了前几篇的大占比理论之后,本篇代码会比较多,重点是df对象的创建与使用

一,DataFrame的创建

  • 创建datafram数据
  • 需要使用一个sparksession的类创建
  • SparkSession类是在SparkContext的基础上进行了封装
  • 也就是SparkSession类中包含了SparkContext

1,使用row+schema组合创建

# encoding=utf-8
from pyspark.sql import Row,SparkSession
from pyspark.sql.types import *

#创建行格式
r1 = Row(1,'凡梦',18,'男')
r2 = Row(id=2,name = '七七',age = 18,sex = '女')

#创建表数据结构
schema_type = (StructType()
               .add('id',IntegerType(),nullable=False)
               .add('name',StringType())
               .add('age',IntegerType())
               .add('sex',StringType()))

#创建df对象
#固定写法 第一步生成spark会话对象
ss = SparkSession.builder.getOrCreate()
#第二步 生成df对象
df = ss.createDataFrame([r1,r2],schema=schema_type)

print(df.show())
print('-'*50)
print(df)
print('-'*50)
print(df.printSchema())

+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
| 1|凡梦| 18| 男|
| 2|七七| 18| 女|
+---+----+---+---+

None

DataFrame[id: int, name: string, age: int, sex: string]

root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- sex: string (nullable = true)

None

2,把RDD转为DataFrame对象

# encoding=utf-8

#这么创建RDD对象会直接报错
'''
from pyspark import SparkContext
sc = SparkContext()
rdd = sc.parallelize([[1,'张三'],[2,'王五']])
'''
from pyspark.sql import SparkSession
#创建 sc 对象
ss = SparkSession.builder.getOrCreate()
sc = ss.sparkContext
rdd = sc.parallelize([[1,'张三'],[2,'李四']])

# 转rdd为df对象
df = rdd.toDF(schema='id int,name string')
df.show()
print('查看表结构')
df.printSchema()

#df转回RDD
rdd2 = df.rdd
#转回的RDD
print(rdd2.collect())

+---+----+
| id|name|
+---+----+
| 1|张三|
| 2|李四|
+---+----+

查看表结构
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)

[Row(id=1, name='张三'), Row(id=2, name='李四')]

3,把pandas中的df对象转为spark的DataFrame对象

# encoding=utf-8

'''
pandas中的dataframe是 由一列一列series构成
spark中的dataframe是 有一行一行的row构成

pandas中的dataframe是单机的
spark中的dataframe是分布式的

他们之间的方法不是互通的

spark中创建dataframe的方法有
一,row+源数据 转为dataframe
二,rdd    转为dataframe
三,pandas中的df对象 转为dataframe
'''

from pyspark.sql import SparkSession
import pandas as pd

#创建pandas中的dataframe对象
data = [[1, 'A', 'm', 2500], [2, 'B', 'f', 1500], [3, 'C', 'm', 5500], [4, 'D', 'f', 500]]
pandas_df = (pd.DataFrame(data, columns=['id', 'name', 'sex', 'salary'])
          .astype({'id':'Int64', 'name':'object', 'sex':'object', 'salary':'Int64'}))

print('*'*50,'下面开始是pandas中的查看datafarme数据结构与数据内容')
print(pandas_df)  #打印数据
print('查看数据结构')
print(pandas_df.info())

print('*'*50,'下面开始是spark中的查看ddatafarme数据结构与数据内容')
#把pandas中的df对象转为spark中的pandas对象
ss = SparkSession.builder.getOrCreate()
# createDataFrame  不仅仅可以放row和源数据进去  生成dataframe
#也可以直接放入 pandas中的 df对象 从而生成 spark中的dataframe
spark_df = ss.createDataFrame(pandas_df)
spark_df.show()
print('查看数据结构')
spark_df.printSchema()

#todo ---------------------------演示取值的明细区别------------------------------
print('*'*50,'下面开始是pandas中的datafarme')
#按行取值 pandas中的dataframe
res = pandas_df['name']  #获取指定的列   这个列是一个series对象
print(res)
print(res[0])   #根据列 取索引第一个

print('*'*50,'下面开始是spark中的datafarme')
#按列取值 spark中的datafarme
res1 = spark_df.limit(1).first() # 获取指定的行  这一行是个row对象
print(res1)
print(res1['name'])  #根据此行的信息 取name对应的值

#todo --------------------------把spark_df 转为pandas_df------------------------------
pa_df2 =spark_df.toPandas()
print(pa_df2)

************************************************** 下面开始是pandas中的查看datafarme数据结构与数据内容
id name sex salary
0 1 A m 2500
1 2 B f 1500
2 3 C m 5500
3 4 D f 500
查看数据结构
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4 entries, 0 to 3
Data columns (total 4 columns):

Column Non-Null Count Dtype


0 id 4 non-null Int64
1 name 4 non-null object
2 sex 4 non-null object
3 salary 4 non-null Int64
dtypes: Int64(2), object(2)
memory usage: 264.0+ bytes
None
************************************************** 下面开始是spark中的查看ddatafarme数据结构与数据内容
Warning: Ignoring non-Spark config property: ;
24/10/13 23:10:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/13 23:10:30 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
+---+----+---+------+
| id|name|sex|salary|
+---+----+---+------+
| 1| A| m| 2500|
| 2| B| f| 1500|
| 3| C| m| 5500|
| 4| D| f| 500|
+---+----+---+------+

查看数据结构
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- sex: string (nullable = true)
|-- salary: long (nullable = true)

************************************************** 下面开始是pandas中的datafarme
0 A
1 B
2 C
3 D
Name: name, dtype: object
A
************************************************** 下面开始是spark中的datafarme
Row(id=1, name='A', sex='m', salary=2500)
A
id name sex salary
0 1 A m 2500
1 2 B f 1500
2 3 C m 5500
3 4 D f 500

4,读取文件创建DataFrame对象

# encoding=utf-8

'''
sparksql 支持对非结构化数据
和半结构化数据
的数据读取
'''

from pyspark.sql import SparkSession

#创建会话对象
ss =SparkSession.builder.getOrCreate()

# 读取csv文件  设置是否获取表头  与指定分隔符
df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')
df.show()

#读取csv文件 也可以自己指定表头
df1 = ss.read.csv('hdfs://node1:8020/test/students.csv',header=False,sep=',',schema='uid string,uname string,sex string,age string,cls string')
df1.show()

#读取json数据  会自动把键转为字段名 把只作为列的值
df_json = ss.read.json('hdfs://node1:8020/test/employees.json')
df_json.show()

#读取orc格式数据
df_orc = ss.read.orc('hdfs://node1:8020/test/users.orc')
df_orc.show()

#获取parquet数据
df_parquet = ss.read.parquet('hdfs://node1:8020/test/users.parquet')
df_parquet.show()

二,DataFrame的SQL使用方法

使用sparksession提供的sql方法,编写sql语句执行

  • 第一步 创建sparksql的会话对象
  • 第二步 读取数据
  • 第三步 创建临时表
  • 第四步 编写sql
  • 第五步 执行sql
  • 第六步 查看结果
# encoding=utf-8

'''
第一步 创建sparksql的会话对象
第二步 读取数据
第三步 创建临时表
第四步 编写sql
第五步 执行sql
第六步 查看结果
'''

from pyspark.sql import  SparkSession
ss =SparkSession.builder.getOrCreate()
df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')
df.createTempView('tb_student')
sql_str = '''
    select gender,avg(age) as avg_age from tb_student group by gender
'''
res = ss.sql(sql_str)
res.show()

三,DataFrame的DSL使用方法

DSL方法是df提供的数据操作函数


  • 使用方式
  • df.方法()
  • 可以进行链式调用
  • df.方法().方法().方法()
  • 方法执行后返回一个新的df保存计算结果
  • new_df = df.方法()
  1. spark提供DSL方法和sql的关键词一样,使用方式和sql基本类似,在进行数据处理时,要按照sql的执行顺序去思考如何处理数据
  2. from join 知道数据在哪 df本身就是要处理的数据 df.join(df2) from 表
  3. where 过滤需要处理的数据 df.join(df2).where()
  4. group by 聚合 数据的计算 df.join(df2).where().groupby().sum()
  5. having 计算后的数据进行过滤 df.join(df2).where().groupby().sum().where()
  6. select 展示数据的字段 df.join(df2).where().groupby().sum().where().select()
  7. order by 展示数据的排序 df.join(df2).where().groupby().sum().where().select().orderBy()
  8. limit 展示数据的数量 df.join(df2).where().groupby().sum().where().select().orderBy().limit()

**

DSL方法执行完成后会得到一个处理后的新的df

**

1,select映射指定的列,以及取别名

# encoding=utf-8

'''
第一步 创建ss对象
第二步 导入scv数据
第三步 开始查询
'''

from pyspark.sql import SparkSession
ss =SparkSession.builder.getOrCreate()

df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')

df.show()

#todo ------------------------取指定的列----------------------
#查询指定的列
df.select('id','name').show(5)
#另外一种方式
df.select(df.id,df.gender).show(5)
#切片的方式
df.select(df['name'],df['cls']).show(5)

#todo ------------------------取别名------------------------
df.select(df.id.alias('user_id')).show(5)

#todo ------------------------更改数据类型------------------------
df.printSchema()
df.select(df.age.cast('int')).printSchema()

2,过滤where的运用

# encoding=utf-8
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')

# 普通查询  本质是调用了filter
df.where('age>20').show()
# 多条件查询
df.where("age>=20 and gender='女'").show()
# 使用python中的方法
df.where(df.age==20).show()

3,分组聚合groupby 以及常规聚合函数的运用

# encoding=utf-8
from pyspark.sql import SparkSession,functions as F
ss = SparkSession.builder.getOrCreate()

df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')

df.show()

res = df.select(df.gender,df.cls,df.age.cast('int').alias('age')).groupBy('gender','cls').sum("age").alias('age')
res.show()

res1 = (df.select(df.gender,df.cls,df.age.cast('int').alias('age'))
        .groupBy('gender','cls')
        .agg(F.avg("age").alias('age'),
             F.sum('age').alias('age1')
             )
             )
res1.show()

4,排序orderby的运用

# encoding=utf-8
from pyspark.sql import SparkSession

ss =SparkSession.builder.getOrCreate()

df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')

#默认升序
df.orderBy('age').show()
#修改为降序
df.orderBy('age',ascending=False).show()

#分页 limit
df.limit(5).show()

#实现sql中 m n 效果的分页  转回RDD 然后切片的方式
rdd = df.rdd
print(rdd.collect()[1:6])

5,分页limit的运用

# encoding=utf-8
from pyspark.sql import SparkSession

ss =SparkSession.builder.getOrCreate()

df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')

#默认升序
df.orderBy('age').show()
#修改为降序
df.orderBy('age',ascending=False).show()

#分页 limit
df.limit(5).show()

#实现sql中 m n 效果的分页  转回RDD 然后切片的方式
rdd = df.rdd
print(rdd.collect()[1:6])

6,连接join的运用

# encoding=utf-8

from pyspark.sql import SparkSession,functions as F

ss = SparkSession.builder.getOrCreate()

df = ss.read.csv('hdfs://node1:8020/test/stu.csv',header=True,sep=',')
df1 = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')

# df.show()
# print('---------------------------')
# df1.show()
# #内连接
df_jion = df.join(df1,'id','inner')
df_jion.show()

#左连接
df_leftjoin = df.join(df1,on='id',how='left')
df_leftjoin.show()

#右连接
df_rightjoin = df.join(df1,how='right',on='in')
df_rightjoin.show()

7,合并union的运用

from pyspark.sql import SparkSession

ss =SparkSession.builder.getOrCreate()

# 读取文件数据
df1 = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='user_id string,username string,sex string,age string,cls string')
df2 = ss.read.csv('hdfs://node1:8020/data/students2.csv',header=False,sep=',',schema='user_id string,username string,sex string,age string,cls string')

# 两表进行关联  合并后不会去重
df_union = df1.union(df2)

df_unionAll =  df1.unionAll(df2)

# 合并后的数据去重
df_distinct =  df_union.distinct().orderBy('user_id')

# 查看数据
df_union.show(100)
print('****************************************************************')
df_unionAll.show(100)
print('****************************************************************')
df_distinct.show(100)

四,总结

  • DataFrame创建又四种方式: - row+schema对象创建- RDD转为df对象- Pandas中的df对象转为Spark中的df对象- 从文件中读取创建
  • DataFrame支持代码中嵌套sql查询直接使用
  • DataFrame也支持DSL(特定领域的语言)使用 - 映射指定的列用select- 条件过滤用 where- 分组聚合使用 group by- 排序使用 order by- 分页使用limit- 连接使用 join 左,右,内,全- 合并使用 union 与unionAll
  • 学习:知识的初次邂逅
  • 复习:知识的温故知新
  • 练习:知识的实践应用
标签: spark sql 大数据

本文转载自: https://blog.csdn.net/qq_55006020/article/details/142906077
版权归原作者 凡梦_leo 所有, 如有侵权,请联系我们删除。

“Spark SQL之DataFrame,df对象的创建与使用”的评论:

还没有评论