0


【pyspark从入门到放弃】DataFrame

环境安装

pyspark支持通过pypip、conda下载,或者手动下载。
笔者通过

pip install

命令从pypip下载并配置安装了3.5.0版本的Spark。

创建实例

使用spark的第一步就是拿到一个

SparkSession

对象。最简单的方法是

SparkSession.builder.getOrCreate()

即,直接使用默认参数创建实例。也可以做一些配置,比如

SparkSession.builder \
        .appName(app_name) \
        .enableHiveSupport() \
        .getOrCreate()

DataFrame

创建DataFrame

DataFrame

是类似

pandas

库中的

DataFrame

的类型,可以转换为SparkSession支持的View。
创建一个

DataFrame

通常使用

SparkSession#createDataFrame

命令。如要创建一个

DataFrame

满足第一列的名称是

integer_value

,类型是整形;第二列的名称是

text_value

,类型是字符串;共有三行数据,分别为(1, ‘a’), (2, ‘b’), (3, ‘abcdefghijklmnopqrstuvwxyz’),则可以使用下面命令创建

data_frame: pyspark.sql.DataFrame = spark.createDataFrame([
    Row(integer_value=1, text_value='a'),
    Row(integer_value=2, text_value='b'),
    Row(integer_value=3, text_value='abcdefghijklmnopqrstuvwxyz'),], schema='integer_value int, text_value string')

查看DataFrame

  • 使用DataFrame#show可以查看数据内容。
  • 使用DataFrame#printSchema可以查看数据结构。
show

执行

data_frame.show()

即可查看

DataFrame

数据内容,得到结果如下:

+-------------+--------------------+
|integer_value|          text_value|
+-------------+--------------------+
|            1|                   a|
|            2|                   b|
|            3|abcdefghijklmnopq...|
+-------------+--------------------+
show

方法其实有三个参数如下:
参数名类型描述nint只取头部的n个数据,不传取所有数据truncateUnion[bool, int]不填或者True或者小于等于0的整数:
如果字段长度不超过20个字符,则全部展示;
如果字段长度超过20,则只展示前面17个字符,跟随

...

表示只展示部分

False:
展示字段的全部内容

大于0的整数:假设数字为

num


如果字段长度不超过

num

,则全部展示;
如果字段长度超过

num

,而且

num > 3

,则只展示前面

num-3

个字符,跟随

...

表示只展示部分
如果字段长度超过

num

,而且

num ≤ 3

,则只展示前面

num

个字符verticalbool是否竖向展示数据。
不填或者False:表格形式展示数据
True:键值对列表形式展示数据

n=2

,执行

data_frame.show(n=2)

得到结果如下:

+-------------+----------+
|integer_value|text_value|
+-------------+----------+
|            1|         a|
|            2|         b|
+-------------+----------+

truncate=3

,执行

data_frame.show(truncate=3)

,得到结果如下:

+-------------+----------+
|integer_value|text_value|
+-------------+----------+
|            1|         a|
|            2|         b|
|            3|       abc|
+-------------+----------+

truncate=4

,执行

data_frame.show(truncate=4)

,得到结果如下:


+-------------+----------+
|integer_value|text_value|
+-------------+----------+
|            1|         a|
|            2|         b|
|            3|      a...|
+-------------+----------+

truncate=True

,执行

data_frame.show(truncate=True)

,得到结果如下:


+-------------+--------------------+
|integer_value|          text_value|
+-------------+--------------------+
|            1|                   a|
|            2|                   b|
|            3|abcdefghijklmnopq...|
+-------------+--------------------+

vertical=True

,执行

data_frame.show(vertical=True)

,得到结果如下:

-RECORD 0-----------------------------
 integer_value | 1                    
 text_value    | a                    
-RECORD 1-----------------------------
 integer_value | 2                    
 text_value    | b                    
-RECORD 2-----------------------------
 integer_value | 3                    
 text_value    | abcdefghijklmnopq... 
columns

获取 列名组成的列表。执行

print(data_frame.columns)

得到打印结果如下:

['integer_value', 'text_value']
printSchema

执行

data_frame.printSchema()

查看

DataFrame

结构,得到结果如下:

root
 |-- integer_value: integer (nullable = true)
 |-- text_value: string (nullable = true)
select
select

方法可以案列打印数据,如执行

data_frame.select("integer_value", "text_value").show()

得到结果:

+-------------+--------------------+
|integer_value|          text_value|
+-------------+--------------------+
|            1|                   a|
|            2|                   b|
|            3|abcdefghijklmnopq...|
+-------------+--------------------+

执行

data_frame.select("integer_value").show()

得到结果:

+-------------+
|integer_value|
+-------------+
|            1|
|            2|
|            3|
+-------------+
describe
describe

方法用于计算数据的基本统计特征,包括数量(count)、平均数(mean)、标准差(stddev)、最小值(min)、最大值(max)等。
执行

data_frame.describe().show()

得到结果:

+-------+-------------+----------+
|summary|integer_value|text_value|
+-------+-------------+----------+
|  count|            3|         3|
|   mean|          2.0|      NULL|
| stddev|          1.0|      NULL|
|    min|            1|         a|
|    max|            3|         b|
+-------+-------------+----------+
describe

方法也可以指定列计算。执行

ddata_frame.describe(['integer_value', 'text_value']).show()

得到相同结果

summary
summary

方法和

describe

一样,用于计算数据的基本统计特征,不能指定要统计的列,但是拥有更多字段,包括数量(count)、平均数(mean)、标准差(stddev)、最小值(min)、25%位数(25%)中位数(100%)75%位数(75%)、最大值(max)等。
执行

data_frame.summary().show()

得到结果:

+-------+-------------+----------+
|summary|integer_value|text_value|
+-------+-------------+----------+
|  count|            3|         3|
|   mean|          2.0|      NULL|
| stddev|          1.0|      NULL|
|    min|            1|         a|
|    25%|            1|      NULL|
|    50%|            2|      NULL|
|    75%|            3|      NULL|
|    max|            3|         b|
+-------+-------------+----------+

配合

select

方法指定目标列,即可解决

summary

方法不能指定列的问题。

collect
collect

方法用于将

DataFrame

中的数据写入内存中,数据量太大可能导致

out-of-memory

异常。执行

print(data_frame.collect())

得到打印结果:

[Row(integer_value=1, text_value='a'), Row(integer_value=2, text_value='b'), Row(integer_value=3, text_value='abcdefghijklmnopqrstuvwxyz')]
take和tail

数据量太大的情况下使用

collect

方法可能导致内存溢出,

take

方法和

tail

方法通过限制查询的数据条数,来规避此问题。

take

方法支持从前往后查询指定个数的数据;

tail

方法支持从后往前查询指定个数的数据。

filter
filter

方法支持对

DataFrame

中的数据进行过滤,便于准确找到目标数据。执行

data_frame.filter('integer_value > 1').show()

,得到结果:

+-------------+--------------------+
|integer_value|          text_value|
+-------------+--------------------+
|            2|                   b|
|            3|abcdefghijklmnopq...|
+-------------+--------------------+
列名

直接

.列名

就可以获得列对象,执行

print(data_frame.integer_value)

,得到结果

Column<'integer_value'>

;执行

data_frame.select(data_frame.integer_value).show()

,得到结果

+-------------+
|integer_value|
+-------------+
|            1|
|            2|
|            3|
+-------------+

DataFrame的修改

toPandas
toPandas

方法用于把

pyspark

DataFrame

类型数据转换为

pandas

DataFrame

类型数据。和

collect

方法一样,如果数据量太大,可能导致

out-of-memory

异常。

withColumn
withColumn

会生成一个新的

DataFrame

实例,内部数据和有原有

DataFrame

数据新增或更新一列,后相同,原有

DataFrame

的值不变。
执行

data_frame.withColumn('upper_text', pyspark.sql.functions.upper(data_frame.text_value)).show()

,得到结果:

+-------------+--------------------+--------------------+
|integer_value|          text_value|          first_char|
+-------------+--------------------+--------------------+
|            1|                   a|                   A|
|            2|                   b|                   B|
|            3|abcdefghijklmnopq...|ABCDEFGHIJKLMNOPQ...|
+-------------+--------------------+--------------------+

再执行

data_frame.show()

,还是和调用

withColumn

方法前一样。

createOrReplaceTempView

执行下面代码

data_frame.createOrReplaceTempView('temp_table')
spark.sql('select * from temp_table').show()

代码将

DataFrame

转换为临时视图,并查看此临时视图的内容得到结果如下:

+-------------+--------------------+
|integer_value|          text_value|
+-------------+--------------------+
|            1|                   a|
|            2|                   b|
|            3|abcdefghijklmnopq...|
+-------------+--------------------+
标签: spark

本文转载自: https://blog.csdn.net/AuspiciousChan/article/details/135309043
版权归原作者 AuspiciousChan 所有, 如有侵权,请联系我们删除。

“【pyspark从入门到放弃】DataFrame”的评论:

还没有评论