0


Python之PySpark常用函数示例

文章目录

一、函数使用示例

之前写了PySpark的简单使用,下面汇总下常用函数,及使用示例。

1.

map
  • 作用:对 RDD/Dataset 中的每个元素应用一个转换操作,并返回新的 RDD/Dataset。
  • 示例:rdd = spark.sparkContext.parallelize([1,2,3,4,5])mapped_rdd = rdd.map(lambda x: x *2)print(mapped_rdd.collect())# 输出 [2, 4, 6, 8, 10]

2.

flatMap
  • 作用:对 RDD/Dataset 中的每个元素应用一个返回多个元素的转换操作,并返回扁平化的结果。
  • 示例:rdd = spark.sparkContext.parallelize(["Hello World","Spark is great"])flat_mapped_rdd = rdd.flatMap(lambda x: x.split(" "))print(flat_mapped_rdd.collect())# 输出 ['Hello', 'World', 'Spark', 'is', 'great']

3.

reduceByKey
  • 作用:对键值对类型的 RDD/Dataset 进行分组,并对每个键的值应用聚合函数。
  • 示例:rdd = spark.sparkContext.parallelize([(1,2),(1,4),(2,3),(2,5)])reduced_rdd = rdd.reduceByKey(lambda a, b: a + b)print(reduced_rdd.collect())# 输出 [(1, 6), (2, 8)]

4.

sortBy
  • 作用:对 RDD/Dataset 中的元素进行排序。
  • 示例:rdd = spark.sparkContext.parallelize([5,3,1,4,2])sorted_rdd = rdd.sortBy(lambda x: x)print(sorted_rdd.collect())# 输出 [1, 2, 3, 4, 5]

5.

take
  • 作用:返回 RDD/Dataset 中的前几个元素。
  • 示例:rdd = spark.sparkContext.parallelize([1,2,3,4,5])taken_elements = rdd.take(3)print(taken_elements)# 输出 [1, 2, 3]

6.

distinct
  • 作用:去除 RDD/Dataset 中的重复元素。
  • 示例:rdd = spark.sparkContext.parallelize([1,2,3,2,4,3,5])distinct_rdd = rdd.distinct()print(distinct_rdd.collect())# 输出 [1, 2, 3, 4, 5]

7.

saveAsTextFile
  • 作用:将 RDD/Dataset 保存为文本文件。
  • 示例:rdd = spark.sparkContext.parallelize(["Hello","World","Spark"])rdd.saveAsTextFile("output.txt")

8.

textFile
  • 作用:从文本文件中创建一个 RDD/Dataset。
  • 示例:rdd = spark.sparkContext.textFile("data.txt")print(rdd.collect())

二、函数使用场景及注意事项

下面是关于 Spark 常用函数的使用场景和注意事项的汇总:

以下是 PySpark 常用函数的使用场景和注意事项的汇总:

1.

map
  • 使用场景:对 RDD/Dataset 中的每个元素应用一个转换操作,并返回新的 RDD/Dataset。
  • 注意事项: - 输入和输出的元素类型可以不同。- 在转换操作中,可以使用 lambda 表达式、函数或方法。- 适用于处理每个元素独立的转换操作,如元素翻倍、字符串拆分等。

2.

flatMap
  • 使用场景:对 RDD/Dataset 中的每个元素应用一个返回多个元素的转换操作,并返回扁平化的结果。
  • 注意事项: - 输入和输出的元素类型可以不同。- 在转换操作中,可以使用 lambda 表达式、函数或方法。- 适用于将元素拆分为多个子元素的操作,如字符串拆分成单词、列表展开等。

3.

reduceByKey
  • 使用场景:对键值对类型的 RDD/Dataset 进行分组,并对每个键的值应用聚合函数。
  • 注意事项: - 输入是键值对类型的 RDD/Dataset。- 需要提供一个聚合函数来定义对值的聚合规则。- 适用于按键进行分组并聚合值的操作,如求和、计数等。

4.

sortBy
  • 使用场景:对 RDD/Dataset 中的元素进行排序。
  • 注意事项: - 可以指定一个或多个排序字段。- 默认情况下,按升序排序,可以通过设置参数进行降序排序。- 适用于对元素进行排序的操作。

5.

take
  • 使用场景:返回 RDD/Dataset 中的前几个元素。
  • 注意事项: - 需要提供要获取的元素数量作为参数。- 结果将作为数组返回。- 适用于需要获取前几个元素的操作。

6.

distinct
  • 使用场景:去除 RDD/Dataset 中的重复元素。
  • 注意事项: - 根据元素的值进行去重。- 结果将保留一个唯一的元素集合。- 适用于需要去除重复元素的操作。

7.

saveAsTextFile
  • 使用场景:将 RDD/Dataset 保存为文本文件。
  • 注意事项: - 指定保存路径和文件名。- 可以将 RDD/Dataset 保存为一个或多个文本文件。- 适用于将结果保存到文本文件中的操作。

8.

textFile
  • 使用场景:从文本文件中创建一个 RDD/Dataset。
  • 注意事项: - 指定要读取的文本文件路径。- 可以读取一个或多个文本文件,并将每一行作为 RDD/Dataset 的一个元素。- 适用于从文本文件中加载数据的操作。

三、总结示例

from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder \
    .appName("Data Example") \
    .getOrCreate()# 读取文本文件作为 DataFrame
data = spark.read.text("data.txt")# 对每一行数据进行转换和处理
processed_data = data.rdd.map(lambda row: row[0].split(" ")) \
    .flatMap(lambda words:[(word,1)for word in words]) \
    .reduceByKey(lambda a, b: a + b)# 将结果按照词频排序
sorted_data = processed_data.sortBy(lambda x: x[1], ascending=False)# 取出前 10 个结果
top_10_words = sorted_data.take(10)# 打印结果for word, count in top_10_words:print(f"{word}: {count}")# 保存结果为文本文件
sorted_data.saveAsTextFile("result.txt")# 关闭 SparkSession
spark.stop()

上述示例演示了以下步骤:

  1. 创建 SparkSession。
  2. 通过 read.text() 方法读取文本文件,并创建一个 DataFrame。
  3. 对每一行数据应用转换操作,使用 map()flatMap() 分别进行单词拆分和计数。
  4. 使用 reduceByKey() 对相同单词的计数进行求和。
  5. 使用 sortBy() 对计数结果进行降序排序。
  6. 使用 take() 提取前 10 个结果,并打印输出。
  7. 使用 saveAsTextFile() 将排序后的结果保存为文本文件。
  8. 关闭 SparkSession。
标签: python 大数据

本文转载自: https://blog.csdn.net/qq_35716085/article/details/135979510
版权归原作者 陈年小趴菜 所有, 如有侵权,请联系我们删除。

“Python之PySpark常用函数示例”的评论:

还没有评论