- 学习:知识的初次邂逅
- 复习:知识的温故知新
- 练习:知识的实践应用
经历了前几篇的大占比理论之后,本篇代码会比较多,重点是df对象的创建与使用
一,DataFrame的创建
- 创建datafram数据
- 需要使用一个sparksession的类创建
- SparkSession类是在SparkContext的基础上进行了封装
- 也就是SparkSession类中包含了SparkContext
1,使用row+schema组合创建
# encoding=utf-8
from pyspark.sql import Row,SparkSession
from pyspark.sql.types import *
#创建行格式
r1 = Row(1,'凡梦',18,'男')
r2 = Row(id=2,name = '七七',age = 18,sex = '女')
#创建表数据结构
schema_type = (StructType()
.add('id',IntegerType(),nullable=False)
.add('name',StringType())
.add('age',IntegerType())
.add('sex',StringType()))
#创建df对象
#固定写法 第一步生成spark会话对象
ss = SparkSession.builder.getOrCreate()
#第二步 生成df对象
df = ss.createDataFrame([r1,r2],schema=schema_type)
print(df.show())
print('-'*50)
print(df)
print('-'*50)
print(df.printSchema())
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
| 1|凡梦| 18| 男|
| 2|七七| 18| 女|
+---+----+---+---+None
DataFrame[id: int, name: string, age: int, sex: string]
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- sex: string (nullable = true)None
2,把RDD转为DataFrame对象
# encoding=utf-8
#这么创建RDD对象会直接报错
'''
from pyspark import SparkContext
sc = SparkContext()
rdd = sc.parallelize([[1,'张三'],[2,'王五']])
'''
from pyspark.sql import SparkSession
#创建 sc 对象
ss = SparkSession.builder.getOrCreate()
sc = ss.sparkContext
rdd = sc.parallelize([[1,'张三'],[2,'李四']])
# 转rdd为df对象
df = rdd.toDF(schema='id int,name string')
df.show()
print('查看表结构')
df.printSchema()
#df转回RDD
rdd2 = df.rdd
#转回的RDD
print(rdd2.collect())
+---+----+
| id|name|
+---+----+
| 1|张三|
| 2|李四|
+---+----+查看表结构
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)[Row(id=1, name='张三'), Row(id=2, name='李四')]
3,把pandas中的df对象转为spark的DataFrame对象
# encoding=utf-8
'''
pandas中的dataframe是 由一列一列series构成
spark中的dataframe是 有一行一行的row构成
pandas中的dataframe是单机的
spark中的dataframe是分布式的
他们之间的方法不是互通的
spark中创建dataframe的方法有
一,row+源数据 转为dataframe
二,rdd 转为dataframe
三,pandas中的df对象 转为dataframe
'''
from pyspark.sql import SparkSession
import pandas as pd
#创建pandas中的dataframe对象
data = [[1, 'A', 'm', 2500], [2, 'B', 'f', 1500], [3, 'C', 'm', 5500], [4, 'D', 'f', 500]]
pandas_df = (pd.DataFrame(data, columns=['id', 'name', 'sex', 'salary'])
.astype({'id':'Int64', 'name':'object', 'sex':'object', 'salary':'Int64'}))
print('*'*50,'下面开始是pandas中的查看datafarme数据结构与数据内容')
print(pandas_df) #打印数据
print('查看数据结构')
print(pandas_df.info())
print('*'*50,'下面开始是spark中的查看ddatafarme数据结构与数据内容')
#把pandas中的df对象转为spark中的pandas对象
ss = SparkSession.builder.getOrCreate()
# createDataFrame 不仅仅可以放row和源数据进去 生成dataframe
#也可以直接放入 pandas中的 df对象 从而生成 spark中的dataframe
spark_df = ss.createDataFrame(pandas_df)
spark_df.show()
print('查看数据结构')
spark_df.printSchema()
#todo ---------------------------演示取值的明细区别------------------------------
print('*'*50,'下面开始是pandas中的datafarme')
#按行取值 pandas中的dataframe
res = pandas_df['name'] #获取指定的列 这个列是一个series对象
print(res)
print(res[0]) #根据列 取索引第一个
print('*'*50,'下面开始是spark中的datafarme')
#按列取值 spark中的datafarme
res1 = spark_df.limit(1).first() # 获取指定的行 这一行是个row对象
print(res1)
print(res1['name']) #根据此行的信息 取name对应的值
#todo --------------------------把spark_df 转为pandas_df------------------------------
pa_df2 =spark_df.toPandas()
print(pa_df2)
************************************************** 下面开始是pandas中的查看datafarme数据结构与数据内容
id name sex salary
0 1 A m 2500
1 2 B f 1500
2 3 C m 5500
3 4 D f 500
查看数据结构
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4 entries, 0 to 3
Data columns (total 4 columns):Column Non-Null Count Dtype
0 id 4 non-null Int64
1 name 4 non-null object
2 sex 4 non-null object
3 salary 4 non-null Int64
dtypes: Int64(2), object(2)
memory usage: 264.0+ bytes
None
************************************************** 下面开始是spark中的查看ddatafarme数据结构与数据内容
Warning: Ignoring non-Spark config property: ;
24/10/13 23:10:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/13 23:10:30 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
+---+----+---+------+
| id|name|sex|salary|
+---+----+---+------+
| 1| A| m| 2500|
| 2| B| f| 1500|
| 3| C| m| 5500|
| 4| D| f| 500|
+---+----+---+------+查看数据结构
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- sex: string (nullable = true)
|-- salary: long (nullable = true)************************************************** 下面开始是pandas中的datafarme
0 A
1 B
2 C
3 D
Name: name, dtype: object
A
************************************************** 下面开始是spark中的datafarme
Row(id=1, name='A', sex='m', salary=2500)
A
id name sex salary
0 1 A m 2500
1 2 B f 1500
2 3 C m 5500
3 4 D f 500
4,读取文件创建DataFrame对象
# encoding=utf-8
'''
sparksql 支持对非结构化数据
和半结构化数据
的数据读取
'''
from pyspark.sql import SparkSession
#创建会话对象
ss =SparkSession.builder.getOrCreate()
# 读取csv文件 设置是否获取表头 与指定分隔符
df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')
df.show()
#读取csv文件 也可以自己指定表头
df1 = ss.read.csv('hdfs://node1:8020/test/students.csv',header=False,sep=',',schema='uid string,uname string,sex string,age string,cls string')
df1.show()
#读取json数据 会自动把键转为字段名 把只作为列的值
df_json = ss.read.json('hdfs://node1:8020/test/employees.json')
df_json.show()
#读取orc格式数据
df_orc = ss.read.orc('hdfs://node1:8020/test/users.orc')
df_orc.show()
#获取parquet数据
df_parquet = ss.read.parquet('hdfs://node1:8020/test/users.parquet')
df_parquet.show()
二,DataFrame的SQL使用方法
使用sparksession提供的sql方法,编写sql语句执行
- 第一步 创建sparksql的会话对象
- 第二步 读取数据
- 第三步 创建临时表
- 第四步 编写sql
- 第五步 执行sql
- 第六步 查看结果
# encoding=utf-8
'''
第一步 创建sparksql的会话对象
第二步 读取数据
第三步 创建临时表
第四步 编写sql
第五步 执行sql
第六步 查看结果
'''
from pyspark.sql import SparkSession
ss =SparkSession.builder.getOrCreate()
df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')
df.createTempView('tb_student')
sql_str = '''
select gender,avg(age) as avg_age from tb_student group by gender
'''
res = ss.sql(sql_str)
res.show()
三,DataFrame的DSL使用方法
DSL方法是df提供的数据操作函数
- 使用方式
- df.方法()
- 可以进行链式调用
- df.方法().方法().方法()
- 方法执行后返回一个新的df保存计算结果
- new_df = df.方法()
- spark提供DSL方法和sql的关键词一样,使用方式和sql基本类似,在进行数据处理时,要按照sql的执行顺序去思考如何处理数据
- from join 知道数据在哪 df本身就是要处理的数据 df.join(df2) from 表
- where 过滤需要处理的数据 df.join(df2).where()
- group by 聚合 数据的计算 df.join(df2).where().groupby().sum()
- having 计算后的数据进行过滤 df.join(df2).where().groupby().sum().where()
- select 展示数据的字段 df.join(df2).where().groupby().sum().where().select()
- order by 展示数据的排序 df.join(df2).where().groupby().sum().where().select().orderBy()
- limit 展示数据的数量 df.join(df2).where().groupby().sum().where().select().orderBy().limit()
**
DSL方法执行完成后会得到一个处理后的新的df
**
1,select映射指定的列,以及取别名
# encoding=utf-8
'''
第一步 创建ss对象
第二步 导入scv数据
第三步 开始查询
'''
from pyspark.sql import SparkSession
ss =SparkSession.builder.getOrCreate()
df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')
df.show()
#todo ------------------------取指定的列----------------------
#查询指定的列
df.select('id','name').show(5)
#另外一种方式
df.select(df.id,df.gender).show(5)
#切片的方式
df.select(df['name'],df['cls']).show(5)
#todo ------------------------取别名------------------------
df.select(df.id.alias('user_id')).show(5)
#todo ------------------------更改数据类型------------------------
df.printSchema()
df.select(df.age.cast('int')).printSchema()
2,过滤where的运用
# encoding=utf-8
from pyspark.sql import SparkSession
ss = SparkSession.builder.getOrCreate()
df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')
# 普通查询 本质是调用了filter
df.where('age>20').show()
# 多条件查询
df.where("age>=20 and gender='女'").show()
# 使用python中的方法
df.where(df.age==20).show()
3,分组聚合groupby 以及常规聚合函数的运用
# encoding=utf-8
from pyspark.sql import SparkSession,functions as F
ss = SparkSession.builder.getOrCreate()
df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')
df.show()
res = df.select(df.gender,df.cls,df.age.cast('int').alias('age')).groupBy('gender','cls').sum("age").alias('age')
res.show()
res1 = (df.select(df.gender,df.cls,df.age.cast('int').alias('age'))
.groupBy('gender','cls')
.agg(F.avg("age").alias('age'),
F.sum('age').alias('age1')
)
)
res1.show()
4,排序orderby的运用
# encoding=utf-8
from pyspark.sql import SparkSession
ss =SparkSession.builder.getOrCreate()
df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')
#默认升序
df.orderBy('age').show()
#修改为降序
df.orderBy('age',ascending=False).show()
#分页 limit
df.limit(5).show()
#实现sql中 m n 效果的分页 转回RDD 然后切片的方式
rdd = df.rdd
print(rdd.collect()[1:6])
5,分页limit的运用
# encoding=utf-8
from pyspark.sql import SparkSession
ss =SparkSession.builder.getOrCreate()
df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')
#默认升序
df.orderBy('age').show()
#修改为降序
df.orderBy('age',ascending=False).show()
#分页 limit
df.limit(5).show()
#实现sql中 m n 效果的分页 转回RDD 然后切片的方式
rdd = df.rdd
print(rdd.collect()[1:6])
6,连接join的运用
# encoding=utf-8
from pyspark.sql import SparkSession,functions as F
ss = SparkSession.builder.getOrCreate()
df = ss.read.csv('hdfs://node1:8020/test/stu.csv',header=True,sep=',')
df1 = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')
# df.show()
# print('---------------------------')
# df1.show()
# #内连接
df_jion = df.join(df1,'id','inner')
df_jion.show()
#左连接
df_leftjoin = df.join(df1,on='id',how='left')
df_leftjoin.show()
#右连接
df_rightjoin = df.join(df1,how='right',on='in')
df_rightjoin.show()
7,合并union的运用
from pyspark.sql import SparkSession
ss =SparkSession.builder.getOrCreate()
# 读取文件数据
df1 = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='user_id string,username string,sex string,age string,cls string')
df2 = ss.read.csv('hdfs://node1:8020/data/students2.csv',header=False,sep=',',schema='user_id string,username string,sex string,age string,cls string')
# 两表进行关联 合并后不会去重
df_union = df1.union(df2)
df_unionAll = df1.unionAll(df2)
# 合并后的数据去重
df_distinct = df_union.distinct().orderBy('user_id')
# 查看数据
df_union.show(100)
print('****************************************************************')
df_unionAll.show(100)
print('****************************************************************')
df_distinct.show(100)
四,总结
- DataFrame创建又四种方式: - row+schema对象创建- RDD转为df对象- Pandas中的df对象转为Spark中的df对象- 从文件中读取创建
- DataFrame支持代码中嵌套sql查询直接使用
- DataFrame也支持DSL(特定领域的语言)使用 - 映射指定的列用select- 条件过滤用 where- 分组聚合使用 group by- 排序使用 order by- 分页使用limit- 连接使用 join 左,右,内,全- 合并使用 union 与unionAll
- 学习:知识的初次邂逅
- 复习:知识的温故知新
- 练习:知识的实践应用
版权归原作者 凡梦_leo 所有, 如有侵权,请联系我们删除。