0


Spark SQL之DataFrame,df对象的创建与使用

  • 学习:知识的初次邂逅
  • 复习:知识的温故知新
  • 练习:知识的实践应用

经历了前几篇的大占比理论之后,本篇代码会比较多,重点是df对象的创建与使用

一,DataFrame的创建

  • 创建datafram数据
  • 需要使用一个sparksession的类创建
  • SparkSession类是在SparkContext的基础上进行了封装
  • 也就是SparkSession类中包含了SparkContext

1,使用row+schema组合创建

  1. # encoding=utf-8
  2. from pyspark.sql import Row,SparkSession
  3. from pyspark.sql.types import *
  4. #创建行格式
  5. r1 = Row(1,'凡梦',18,'男')
  6. r2 = Row(id=2,name = '七七',age = 18,sex = '女')
  7. #创建表数据结构
  8. schema_type = (StructType()
  9. .add('id',IntegerType(),nullable=False)
  10. .add('name',StringType())
  11. .add('age',IntegerType())
  12. .add('sex',StringType()))
  13. #创建df对象
  14. #固定写法 第一步生成spark会话对象
  15. ss = SparkSession.builder.getOrCreate()
  16. #第二步 生成df对象
  17. df = ss.createDataFrame([r1,r2],schema=schema_type)
  18. print(df.show())
  19. print('-'*50)
  20. print(df)
  21. print('-'*50)
  22. print(df.printSchema())

+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
| 1|凡梦| 18| 男|
| 2|七七| 18| 女|
+---+----+---+---+

None

DataFrame[id: int, name: string, age: int, sex: string]

root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- sex: string (nullable = true)

None

2,把RDD转为DataFrame对象

  1. # encoding=utf-8
  2. #这么创建RDD对象会直接报错
  3. '''
  4. from pyspark import SparkContext
  5. sc = SparkContext()
  6. rdd = sc.parallelize([[1,'张三'],[2,'王五']])
  7. '''
  8. from pyspark.sql import SparkSession
  9. #创建 sc 对象
  10. ss = SparkSession.builder.getOrCreate()
  11. sc = ss.sparkContext
  12. rdd = sc.parallelize([[1,'张三'],[2,'李四']])
  13. # 转rdd为df对象
  14. df = rdd.toDF(schema='id int,name string')
  15. df.show()
  16. print('查看表结构')
  17. df.printSchema()
  18. #df转回RDD
  19. rdd2 = df.rdd
  20. #转回的RDD
  21. print(rdd2.collect())

+---+----+
| id|name|
+---+----+
| 1|张三|
| 2|李四|
+---+----+

查看表结构
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)

[Row(id=1, name='张三'), Row(id=2, name='李四')]

3,把pandas中的df对象转为spark的DataFrame对象

  1. # encoding=utf-8
  2. '''
  3. pandas中的dataframe是 由一列一列series构成
  4. spark中的dataframe是 有一行一行的row构成
  5. pandas中的dataframe是单机的
  6. spark中的dataframe是分布式的
  7. 他们之间的方法不是互通的
  8. spark中创建dataframe的方法有
  9. 一,row+源数据 转为dataframe
  10. 二,rdd 转为dataframe
  11. 三,pandas中的df对象 转为dataframe
  12. '''
  13. from pyspark.sql import SparkSession
  14. import pandas as pd
  15. #创建pandas中的dataframe对象
  16. data = [[1, 'A', 'm', 2500], [2, 'B', 'f', 1500], [3, 'C', 'm', 5500], [4, 'D', 'f', 500]]
  17. pandas_df = (pd.DataFrame(data, columns=['id', 'name', 'sex', 'salary'])
  18. .astype({'id':'Int64', 'name':'object', 'sex':'object', 'salary':'Int64'}))
  19. print('*'*50,'下面开始是pandas中的查看datafarme数据结构与数据内容')
  20. print(pandas_df) #打印数据
  21. print('查看数据结构')
  22. print(pandas_df.info())
  23. print('*'*50,'下面开始是spark中的查看ddatafarme数据结构与数据内容')
  24. #把pandas中的df对象转为spark中的pandas对象
  25. ss = SparkSession.builder.getOrCreate()
  26. # createDataFrame 不仅仅可以放row和源数据进去 生成dataframe
  27. #也可以直接放入 pandas中的 df对象 从而生成 spark中的dataframe
  28. spark_df = ss.createDataFrame(pandas_df)
  29. spark_df.show()
  30. print('查看数据结构')
  31. spark_df.printSchema()
  32. #todo ---------------------------演示取值的明细区别------------------------------
  33. print('*'*50,'下面开始是pandas中的datafarme')
  34. #按行取值 pandas中的dataframe
  35. res = pandas_df['name'] #获取指定的列 这个列是一个series对象
  36. print(res)
  37. print(res[0]) #根据列 取索引第一个
  38. print('*'*50,'下面开始是spark中的datafarme')
  39. #按列取值 spark中的datafarme
  40. res1 = spark_df.limit(1).first() # 获取指定的行 这一行是个row对象
  41. print(res1)
  42. print(res1['name']) #根据此行的信息 取name对应的值
  43. #todo --------------------------把spark_df 转为pandas_df------------------------------
  44. pa_df2 =spark_df.toPandas()
  45. print(pa_df2)

************************************************** 下面开始是pandas中的查看datafarme数据结构与数据内容
id name sex salary
0 1 A m 2500
1 2 B f 1500
2 3 C m 5500
3 4 D f 500
查看数据结构
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4 entries, 0 to 3
Data columns (total 4 columns):

Column Non-Null Count Dtype


0 id 4 non-null Int64
1 name 4 non-null object
2 sex 4 non-null object
3 salary 4 non-null Int64
dtypes: Int64(2), object(2)
memory usage: 264.0+ bytes
None
************************************************** 下面开始是spark中的查看ddatafarme数据结构与数据内容
Warning: Ignoring non-Spark config property: ;
24/10/13 23:10:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/13 23:10:30 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
+---+----+---+------+
| id|name|sex|salary|
+---+----+---+------+
| 1| A| m| 2500|
| 2| B| f| 1500|
| 3| C| m| 5500|
| 4| D| f| 500|
+---+----+---+------+

查看数据结构
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- sex: string (nullable = true)
|-- salary: long (nullable = true)

************************************************** 下面开始是pandas中的datafarme
0 A
1 B
2 C
3 D
Name: name, dtype: object
A
************************************************** 下面开始是spark中的datafarme
Row(id=1, name='A', sex='m', salary=2500)
A
id name sex salary
0 1 A m 2500
1 2 B f 1500
2 3 C m 5500
3 4 D f 500

4,读取文件创建DataFrame对象

  1. # encoding=utf-8
  2. '''
  3. sparksql 支持对非结构化数据
  4. 和半结构化数据
  5. 的数据读取
  6. '''
  7. from pyspark.sql import SparkSession
  8. #创建会话对象
  9. ss =SparkSession.builder.getOrCreate()
  10. # 读取csv文件 设置是否获取表头 与指定分隔符
  11. df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')
  12. df.show()
  13. #读取csv文件 也可以自己指定表头
  14. df1 = ss.read.csv('hdfs://node1:8020/test/students.csv',header=False,sep=',',schema='uid string,uname string,sex string,age string,cls string')
  15. df1.show()
  16. #读取json数据 会自动把键转为字段名 把只作为列的值
  17. df_json = ss.read.json('hdfs://node1:8020/test/employees.json')
  18. df_json.show()
  19. #读取orc格式数据
  20. df_orc = ss.read.orc('hdfs://node1:8020/test/users.orc')
  21. df_orc.show()
  22. #获取parquet数据
  23. df_parquet = ss.read.parquet('hdfs://node1:8020/test/users.parquet')
  24. df_parquet.show()

二,DataFrame的SQL使用方法

使用sparksession提供的sql方法,编写sql语句执行

  • 第一步 创建sparksql的会话对象
  • 第二步 读取数据
  • 第三步 创建临时表
  • 第四步 编写sql
  • 第五步 执行sql
  • 第六步 查看结果
  1. # encoding=utf-8
  2. '''
  3. 第一步 创建sparksql的会话对象
  4. 第二步 读取数据
  5. 第三步 创建临时表
  6. 第四步 编写sql
  7. 第五步 执行sql
  8. 第六步 查看结果
  9. '''
  10. from pyspark.sql import SparkSession
  11. ss =SparkSession.builder.getOrCreate()
  12. df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')
  13. df.createTempView('tb_student')
  14. sql_str = '''
  15. select gender,avg(age) as avg_age from tb_student group by gender
  16. '''
  17. res = ss.sql(sql_str)
  18. res.show()

三,DataFrame的DSL使用方法

DSL方法是df提供的数据操作函数


  • 使用方式
  • df.方法()
  • 可以进行链式调用
  • df.方法().方法().方法()
  • 方法执行后返回一个新的df保存计算结果
  • new_df = df.方法()
  1. spark提供DSL方法和sql的关键词一样,使用方式和sql基本类似,在进行数据处理时,要按照sql的执行顺序去思考如何处理数据
  2. from join 知道数据在哪 df本身就是要处理的数据 df.join(df2) from 表
  3. where 过滤需要处理的数据 df.join(df2).where()
  4. group by 聚合 数据的计算 df.join(df2).where().groupby().sum()
  5. having 计算后的数据进行过滤 df.join(df2).where().groupby().sum().where()
  6. select 展示数据的字段 df.join(df2).where().groupby().sum().where().select()
  7. order by 展示数据的排序 df.join(df2).where().groupby().sum().where().select().orderBy()
  8. limit 展示数据的数量 df.join(df2).where().groupby().sum().where().select().orderBy().limit()

**

  1. DSL方法执行完成后会得到一个处理后的新的df

**

1,select映射指定的列,以及取别名

  1. # encoding=utf-8
  2. '''
  3. 第一步 创建ss对象
  4. 第二步 导入scv数据
  5. 第三步 开始查询
  6. '''
  7. from pyspark.sql import SparkSession
  8. ss =SparkSession.builder.getOrCreate()
  9. df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')
  10. df.show()
  11. #todo ------------------------取指定的列----------------------
  12. #查询指定的列
  13. df.select('id','name').show(5)
  14. #另外一种方式
  15. df.select(df.id,df.gender).show(5)
  16. #切片的方式
  17. df.select(df['name'],df['cls']).show(5)
  18. #todo ------------------------取别名------------------------
  19. df.select(df.id.alias('user_id')).show(5)
  20. #todo ------------------------更改数据类型------------------------
  21. df.printSchema()
  22. df.select(df.age.cast('int')).printSchema()

2,过滤where的运用

  1. # encoding=utf-8
  2. from pyspark.sql import SparkSession
  3. ss = SparkSession.builder.getOrCreate()
  4. df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')
  5. # 普通查询 本质是调用了filter
  6. df.where('age>20').show()
  7. # 多条件查询
  8. df.where("age>=20 and gender='女'").show()
  9. # 使用python中的方法
  10. df.where(df.age==20).show()

3,分组聚合groupby 以及常规聚合函数的运用

  1. # encoding=utf-8
  2. from pyspark.sql import SparkSession,functions as F
  3. ss = SparkSession.builder.getOrCreate()
  4. df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')
  5. df.show()
  6. res = df.select(df.gender,df.cls,df.age.cast('int').alias('age')).groupBy('gender','cls').sum("age").alias('age')
  7. res.show()
  8. res1 = (df.select(df.gender,df.cls,df.age.cast('int').alias('age'))
  9. .groupBy('gender','cls')
  10. .agg(F.avg("age").alias('age'),
  11. F.sum('age').alias('age1')
  12. )
  13. )
  14. res1.show()

4,排序orderby的运用

  1. # encoding=utf-8
  2. from pyspark.sql import SparkSession
  3. ss =SparkSession.builder.getOrCreate()
  4. df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')
  5. #默认升序
  6. df.orderBy('age').show()
  7. #修改为降序
  8. df.orderBy('age',ascending=False).show()
  9. #分页 limit
  10. df.limit(5).show()
  11. #实现sql中 m n 效果的分页 转回RDD 然后切片的方式
  12. rdd = df.rdd
  13. print(rdd.collect()[1:6])

5,分页limit的运用

  1. # encoding=utf-8
  2. from pyspark.sql import SparkSession
  3. ss =SparkSession.builder.getOrCreate()
  4. df = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')
  5. #默认升序
  6. df.orderBy('age').show()
  7. #修改为降序
  8. df.orderBy('age',ascending=False).show()
  9. #分页 limit
  10. df.limit(5).show()
  11. #实现sql中 m n 效果的分页 转回RDD 然后切片的方式
  12. rdd = df.rdd
  13. print(rdd.collect()[1:6])

6,连接join的运用

  1. # encoding=utf-8
  2. from pyspark.sql import SparkSession,functions as F
  3. ss = SparkSession.builder.getOrCreate()
  4. df = ss.read.csv('hdfs://node1:8020/test/stu.csv',header=True,sep=',')
  5. df1 = ss.read.csv('hdfs://node1:8020/test/students.csv',header=True,sep=',')
  6. # df.show()
  7. # print('---------------------------')
  8. # df1.show()
  9. # #内连接
  10. df_jion = df.join(df1,'id','inner')
  11. df_jion.show()
  12. #左连接
  13. df_leftjoin = df.join(df1,on='id',how='left')
  14. df_leftjoin.show()
  15. #右连接
  16. df_rightjoin = df.join(df1,how='right',on='in')
  17. df_rightjoin.show()

7,合并union的运用

  1. from pyspark.sql import SparkSession
  2. ss =SparkSession.builder.getOrCreate()
  3. # 读取文件数据
  4. df1 = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='user_id string,username string,sex string,age string,cls string')
  5. df2 = ss.read.csv('hdfs://node1:8020/data/students2.csv',header=False,sep=',',schema='user_id string,username string,sex string,age string,cls string')
  6. # 两表进行关联 合并后不会去重
  7. df_union = df1.union(df2)
  8. df_unionAll = df1.unionAll(df2)
  9. # 合并后的数据去重
  10. df_distinct = df_union.distinct().orderBy('user_id')
  11. # 查看数据
  12. df_union.show(100)
  13. print('****************************************************************')
  14. df_unionAll.show(100)
  15. print('****************************************************************')
  16. df_distinct.show(100)

四,总结

  • DataFrame创建又四种方式: - row+schema对象创建- RDD转为df对象- Pandas中的df对象转为Spark中的df对象- 从文件中读取创建
  • DataFrame支持代码中嵌套sql查询直接使用
  • DataFrame也支持DSL(特定领域的语言)使用 - 映射指定的列用select- 条件过滤用 where- 分组聚合使用 group by- 排序使用 order by- 分页使用limit- 连接使用 join 左,右,内,全- 合并使用 union 与unionAll
  • 学习:知识的初次邂逅
  • 复习:知识的温故知新
  • 练习:知识的实践应用
标签: spark sql 大数据

本文转载自: https://blog.csdn.net/qq_55006020/article/details/142906077
版权归原作者 凡梦_leo 所有, 如有侵权,请联系我们删除。

“Spark SQL之DataFrame,df对象的创建与使用”的评论:

还没有评论