0


Pyspark dataframe基本内置方法(5)

文章目录

Pyspark sql DataFrame

相关文章

Pyspark下操作dataframe方法(1)
Pyspark下操作dataframe方法(2)
Pyspark下操作dataframe方法(3)
Pyspark下操作dataframe方法(4)
Pyspark下操作dataframe方法(5)

toDF 设置新列名

列名更新,将会按照新列名顺序的替换原列名返回新dataframe,更新列名数量需要跟原始列名数量一致。

  1. from pyspark.sql.functions import lit
  2. data.show()
  3. +-----+---+---+------+------+
  4. | name|age| id|gender|new_id|
  5. +-----+---+---+------+------+
  6. | ldsx| 12| 1| 男| 1|
  7. |test1| 20| 1| 女| 1|
  8. |test2| 26| 1| 男| 1|
  9. |test3| 19| 1| 女| 1|
  10. |test4| 51| 1| 女| 1|
  11. |test5| 13| 1| 男| 1|
  12. +-----+---+---+------+------+
  13. data.toDF(*['n1','n2','n3','n5','n4']).show()
  14. +-----+---+---+---+---+
  15. | n1| n2| n3| n5| n4|
  16. +-----+---+---+---+---+
  17. | ldsx| 12| 1| 男| 1|
  18. |test1| 20| 1| 女| 1|
  19. |test2| 26| 1| 男| 1|
  20. |test3| 19| 1| 女| 1|
  21. |test4| 51| 1| 女| 1|
  22. |test5| 13| 1| 男| 1|
  23. +-----+---+---+---+---+

toJSON row对象转换json字符串

把dataframe的row对象转换为json字符串,返回rdd

  1. data.rdd.first()
  2. Row(name='ldsx', age='12', id='1', gender='男', new_id='1')
  3. # data.toJSON()返回rdd类型
  4. data.toJSON().first()
  5. '{"name":"ldsx","age":"12","id":"1","gender":"男","new_id":"1"}'

toLocallterator 获取迭代器

返回一个迭代器,其中包含此DataFrame中的所有行。迭代器将消耗与此DataFrame中最大分区一样多的内存。通过预取,它可能会消耗最多2个最大分区的内存。

  1. d1 = data.toLocalIterator()
  2. d1
  3. <generator object _local_iterator_from_socket.<locals>.PyLocalIterable.__iter__ at 0x7f55c86e0570>
  4. # 便利迭代器
  5. for i in d1:
  6. print(i)
  7. Row(name='ldsx', age='12', id='1', gender='男', new_id='1')
  8. Row(name='test1', age='20', id='1', gender='女', new_id='1')
  9. Row(name='test2', age='26', id='1', gender='男', new_id='1')
  10. Row(name='test3', age='19', id='1', gender='女', new_id='1')
  11. Row(name='test4', age='51', id='1', gender='女', new_id='1')
  12. Row(name='test5', age='13', id='1', gender='男', new_id='1')

toPandas 转换python dataframe

需要python环境安装pandas的前提下使用,且dataframe需要很小,因为所有数据都加载到driver的内存中。

  1. data.toPandas()
  2. type(data.toPandas())
  3. <class 'pandas.core.frame.DataFrame'>
  4. name age id gender new_id
  5. 0 ldsx 12 1 1
  6. 1 test1 20 1 1
  7. 2 test2 26 1 1
  8. 3 test3 19 1 1
  9. 4 test4 51 1 1
  10. 5 test5 13 1 1

transform dataframe转换

参数为处理函数,返回值必须为dataframe

  1. data.show()
  2. +-----+---+---+------+------+
  3. | name|age| id|gender|new_id|
  4. +-----+---+---+------+------+
  5. | ldsx| 12| 1| 男| 1|
  6. |test1| 20| 1| 女| 1|
  7. |test2| 26| 1| 男| 1|
  8. |test3| 19| 1| 女| 1|
  9. |test4| 51| 1| 女| 1|
  10. |test5| 13| 1| 男| 1|
  11. +-----+---+---+------+------+
  12. # 处理函数自定义最后返回了dataframe
  13. def ldsx(spark_df):
  14. colums = [ str(i)+'_ldsx' for i in range(len(spark_df.columns)) ]
  15. return spark_df.toDF(*colums)
  16. data.transform(ldsx).show()
  17. +------+------+------+------+------+
  18. |0_ldsx|1_ldsx|2_ldsx|3_ldsx|4_ldsx|
  19. +------+------+------+------+------+
  20. | ldsx| 12| 1| 男| 1|
  21. | test1| 20| 1| 女| 1|
  22. | test2| 26| 1| 男| 1|
  23. | test3| 19| 1| 女| 1|
  24. | test4| 51| 1| 女| 1|
  25. | test5| 13| 1| 男| 1|
  26. +------+------+------+------+------+

union unionALL 并集不去重(按列顺序)

获得新dataframe,unionall别名为union,如果要去重使用distinct方法,不会解析对应的列名合并,是按照列的顺序合并的,硬合

  1. df2 = spark.createDataFrame([(3, 'C'), (4, 'D')], ['id', 'value'])
  2. df1 = spark.createDataFrame([(1, 'A'), (2, 'B'),(3, 'C'),(3, 'C')], ['id', 'value'])
  3. df1.show()
  4. +---+-----+
  5. | id|value|
  6. +---+-----+
  7. | 1| A|
  8. | 2| B|
  9. | 3| C|
  10. | 3| C|
  11. +---+-----+
  12. df2.show()
  13. +---+-----+
  14. | id|value|
  15. +---+-----+
  16. | 3| C|
  17. | 4| D|
  18. +---+-----+
  19. df1.union(df2)
  20. DataFrame[id: bigint, value: string]
  21. df1.union(df2).show()
  22. +---+-----+
  23. | id|value|
  24. +---+-----+
  25. | 1| A|
  26. | 2| B|
  27. | 3| C|
  28. | 3| C|
  29. | 3| C|
  30. | 4| D|
  31. +---+-----+
  32. # 去重使用distinct
  33. df1.union(df2).distinct().show()
  34. +---+-----+
  35. | id|value|
  36. +---+-----+
  37. | 2| B|
  38. | 1| A|
  39. | 3| C|
  40. | 4| D|
  41. +---+-----+

unionByName 并集不去重(按列名)

是否允许缺失列:allowMissingColumns,默认不允许

  1. # 按照列名合并
  2. df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
  3. df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
  4. df1.unionByName(df2).show()
  5. +----+----+----+
  6. |col0|col1|col2|
  7. +----+----+----+
  8. | 1| 2| 3|
  9. | 6| 4| 5|
  10. +----+----+----+
  11. # 对于不存在列进行填补
  12. df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
  13. df2 = spark.createDataFrame([[4, 5, 6, 7]], ["col1", "col2", "col3", "col4"])
  14. # allowMissingColumns True默认填补null
  15. df1.unionByName(df2, allowMissingColumns=True).show()
  16. +----+----+----+----+----+
  17. |col0|col1|col2|col3|col4|
  18. +----+----+----+----+----+
  19. | 1| 2| 3|NULL|NULL|
  20. |NULL| 4| 5| 6| 7|
  21. +----+----+----+----+----+

unpivot 反转表(宽表转长表)

ids: 标识列
values:选中的列(LIST)
variableColumnName: 列名
valueColumnName:对应列的值

宽表转长表,一行变多行,除了选中的ids是不变的,但是会把选中的values中的列由列变成行记录,variableColumnName记录了反转前的列名,

valueColumnName 对应 variableColumnName 存储值。

  1. data.show()
  2. +-----+---+---+------+------+
  3. | name|age| id|gender|new_id|
  4. +-----+---+---+------+------+
  5. | ldsx| 12| 1| 男| 1|
  6. |test1| 20| 1| 女| 1|
  7. |test2| 26| 1| 男| 1|
  8. |test3| 19| 1| 女| 1|
  9. |test4| 51| 1| 女| 1|
  10. |test5| 13| 1| 男| 1|
  11. +-----+---+---+------+------+
  12. # 一行变成三行,id不变 'age','name','gender'由列转行,c_col依次记录'age','name','gender',c_value则记录对应的值
  13. data.unpivot('id',['age','name','gender'],'c_col','c_value').show()
  14. +---+------+-------+
  15. | id| c_col|c_value|
  16. +---+------+-------+
  17. | 1| age| 12|
  18. | 1| name| ldsx|
  19. | 1|gender| 男|
  20. | 1| age| 20|
  21. | 1| name| test1|
  22. | 1|gender| 女|
  23. | 1| age| 26|
  24. | 1| name| test2|
  25. | 1|gender| 男|
  26. | 1| age| 19|
  27. | 1| name| test3|
  28. | 1|gender| 女|
  29. | 1| age| 51|
  30. | 1| name| test4|
  31. | 1|gender| 女|
  32. | 1| age| 13|
  33. | 1| name| test5|
  34. | 1|gender| 男|
  35. +---+------+-------+

withColumn 添加列操作

通过添加列或替换具有相同名称的现有列来返回新的DataFrame。
列表达式必须是此DataFrame上的表达式;列只能引用此数据集提供的属性。添加引用其他数据集的列是错误的。

可以使用lit设置常量作为列

可以使用表达式设置列

  1. # 使用d1上的列或者用常量列
  2. d1.withColumn('c_value2',d1.c_value).show()
  3. +---+------+-------+--------+
  4. | id| c_col|c_value|c_value2|
  5. +---+------+-------+--------+
  6. | 1| age| 12| 12|
  7. | 1| name| ldsx| ldsx|
  8. | 1|gender| 男| 男|
  9. | 1| age| 20| 20|
  10. | 1| name| test1| test1|
  11. | 1|gender| 女| 女|
  12. | 1| age| 26| 26|
  13. | 1| name| test2| test2|
  14. | 1|gender| 男| 男|
  15. | 1| age| 19| 19|
  16. | 1| name| test3| test3|
  17. | 1|gender| 女| 女|
  18. | 1| age| 51| 51|
  19. | 1| name| test4| test4|
  20. | 1|gender| 女| 女|
  21. | 1| age| 13| 13|
  22. | 1| name| test5| test5|
  23. | 1|gender| 男| 男|
  24. +---+------+-------+--------+
  25. # 使用常量补充列
  26. from pyspark.sql.functions import lit
  27. d1.withColumn('c_value2',lit('ldsx')).show()
  28. +---+------+-------+--------+
  29. | id| c_col|c_value|c_value2|
  30. +---+------+-------+--------+
  31. | 1| age| 12| ldsx|
  32. | 1| name| ldsx| ldsx|
  33. | 1|gender| 男| ldsx|
  34. | 1| age| 20| ldsx|
  35. | 1| name| test1| ldsx|
  36. | 1|gender| 女| ldsx|
  37. | 1| age| 26| ldsx|
  38. | 1| name| test2| ldsx|
  39. | 1|gender| 男| ldsx|
  40. | 1| age| 19| ldsx|
  41. | 1| name| test3| ldsx|
  42. | 1|gender| 女| ldsx|
  43. | 1| age| 51| ldsx|
  44. | 1| name| test4| ldsx|
  45. | 1|gender| 女| ldsx|
  46. | 1| age| 13| ldsx|
  47. | 1| name| test5| ldsx|
  48. | 1|gender| 男| ldsx|
  49. +---+------+-------+--------+
  50. # 使用表达式设置列
  51. data = [(1,), (2,), (3,), (4,)]
  52. df = spark.createDataFrame(data, ["number"])
  53. df.show()
  54. +------+
  55. |number|
  56. +------+
  57. | 1|
  58. | 2|
  59. | 3|
  60. | 4|
  61. +------+
  62. from pyspark.sql.functions import col, when
  63. df.withColumn("new_number", when(df.number < 3, "Low").otherwise("High")).show()
  64. ------+----------+
  65. |number|new_number|
  66. +------+----------+
  67. | 1| Low|
  68. | 2| Low|
  69. | 3| High|
  70. | 4| High|
  71. +------+----------+

withColumns 添加多列操作

通过添加列或替换具有相同名称的现有列来返回新的DataFrame。
列表达式必须是此DataFrame上的表达式;列只能引用此数据集提供的属性。添加引用其他数据集的列是错误的。

可以使用lit设置常量作为列

可以使用表达式设置列

  1. df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
  2. df.withColumns({'age2': df.age + 2, 'age3': df.age + 3}).show()
  3. +---+-----+----+----+
  4. |age| name|age2|age3|
  5. +---+-----+----+----+
  6. | 2|Alice| 4| 5|
  7. | 5| Bob| 7| 8|
  8. +---+-----+----+----+
  9. # 可使用表达式
  10. df.withColumns({'h1': when(df.age < 2, "Low").otherwise("High"), 'h2': df.age + 3}).show()
  11. +---+-----+----+---+
  12. |age| name| h1| h2|
  13. +---+-----+----+---+
  14. | 2|Alice|High| 5|
  15. | 5| Bob|High| 8|
  16. +---+-----+----+---+

withColumnRenamed 列重命名

不存在的列重命名报错,返回新dataframe。

列,重命名列

  1. df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
  2. df.withColumnRenamed('age', 'age2').show()
  3. +----+-----+
  4. |age2| name|
  5. +----+-----+
  6. | 2|Alice|
  7. | 5| Bob|
  8. +----+-----+

withColumnsRenamed 多列重命名

字典,列名的映射

  1. df.withColumnsRenamed({'age':'new_age','name':'new_name'}).show()
  2. +-------+--------+
  3. |new_age|new_name|
  4. +-------+--------+
  5. | 2| Alice|
  6. | 5| Bob|
  7. +-------+--------+

withMetadata 设置元数据

更新元数据,返回新dataframe

  1. df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
  2. # 查看列的元数据
  3. df.schema['age'].metadata
  4. {}
  5. # 设置元数据
  6. df_meta = df.withMetadata('age', {'foo': 'bar'})
  7. df_meta.schema['age'].metadata
  8. {'foo': 'bar'}

write 存储表

write.saveAsTable

当追加插入的时候dataframe只需要scheam一致,会自动匹配

  • name: str, 表名
  • format: Optional[str] = None, 格式类型 hive,parquet…
  • mode: Optional[str] = None, 写入方式1. append:将this:class:DataFrame的内容附加到现有数据中,数据格式需要一致。2. “overwrite”:覆盖现有数据,数据格式不重要了,已此次覆盖为准。3. errorerrorifeists:如果数据已经存在,则抛出异常。4. ‘ignore’:如果数据已经存在,则自动忽略此操作。
  • partitionBy: Optional[Union[str, List[str]]] = None, 分区列表
  1. df.show()
  2. +---+-----+
  3. |age| name|
  4. +---+-----+
  5. | 2|Alice|
  6. | 5| Bob|
  7. +---+-----+
  8. # 覆盖重写
  9. df.write.saveAsTable('ldsx_test','parquet','overwrite',['age'])
  10. # 追加写入
  11. df.write.saveAsTable('ldsx_test','parquet','append',['age'])
  12. # 另一种写法
  13. df.write.format('parquet').mode('append').partitionBy(['age']).saveAsTable('ldsx_test')

在这里插入图片描述

在这里插入图片描述

insertInto

不会对scheam进行校验,按位置插入

  1. d2.show()
  2. +-----+----+
  3. |name1|age1|
  4. +-----+----+
  5. |ldsx1| 2|
  6. |ldsx2| 3|
  7. +-----+----+
  8. d2.write.insertInto('ldsx_test')
  9. d2.schema
  10. StructType([StructField('name1', StringType(), True), StructField('age1', LongType(), True)])

本文转载自: https://blog.csdn.net/weixin_43322583/article/details/142377270
版权归原作者 百流 所有, 如有侵权,请联系我们删除。

“Pyspark dataframe基本内置方法(5)”的评论:

还没有评论