PySQL
文章目录
使用Spark RDD实现单词计数
#构建SparkSession和SparkContext实例
spark = SparkSession.builder
.master("spark://panyx:7077").appName("demo").getOrCreate()
sc = spark.sparkContext()# 读取数据源文件,构造一个RDD
source ="/data/spark/word.txt"
sc.textFile(source).flatMap(lambda line: line.split(" "))# 将每行数据按空格拆分单词.map(lambda word:(word,1))# 将每个单词加上计数值 1.reduceByKey(lambda a,b:a + b)# 将所有相同的单词进行聚合相加求各单词总数.collect()# 返回结果给Drive程序触发RDD计算(Action)# 将计算结果保存在文件中 --Action
dataSink ="data/spark/word-result"
sc.saveAsTextFile(dataSink)# 关闭SparkSession
spark.stop()
创建RDD
三种创建方式
- 将现有集合并行化;
# 构造一个RDD
rdd = sc.parallelize(range(1,21,2))
- 加载外部存储系统的数据集;
# 从文本文件创建RDD
rdd = sc.textFile("hdfs://path/to/file.txt")# 从CSV文件创建RDD
df = spark.read.csv("hdfs://path/to/file.csv")
rdd = df.rdd
# 从JSON文件创建RDD
df = spark.read.json("hdfs://path/to/file.json")
rdd = df.rdd
# 从Parquet文件创建RDD
df = spark.read.parquet("hdfs://path/to/file.parquet")
rdd = df.rdd
- 在现有RDD上进行转换得到新的RDD;
# 通过转换操作 (transformations) 来创建新的 RDD# map()、filter()、flatMap()、distinct()等
new_rdd = old_rdd.map(lambda x: x *2)
操作RDD
两种类型的操作:
Transformation
和
Action
- Transformation: 是定义如何创建RDD的延迟操作,每当在RDD上执行
Transformation
转换时,都会生成一个新的RDD; - 常见的Transformation
操作:map()
,flatMap()
,filter()
,distinct()
- Action: 返回一个结果给驱动程序或将结果存入存储的操作,并开始执行计算; - 常见的
Action
操作:
action操作描述reduce(func)使用函数func对RDD中的元素进行聚合计算collect()将RDD操作的所有结果返回给驱动程序。这通常对产生足够小的RDD的操作很有用count()返回RDD中的元素数量first()返回RDD的第一个元素。它的工作原理类似于take(1)函数take(n)返回RDD的前n个元素。它首先扫描一个分区,然后使用该分区的结果来估计满足该限制所需的其他分区的数量。这个方法应该只在预期得到的数组很小的情况下使用,因为所有的数据都加载到驱动程序的内存中top(n)按照指定的隐式排序从这个RDD中取出最大的n个元素,并维护排序。这与takeOrdered相反。这个方法应该只在预期得到的数组很小的情况下使用,因为所有的数据都加载到驱动程序的内存中takeSample返回一个数组,其中包含来自RDD的元素的抽样子集takeOrdered(n)返回RDD的前n个(最小的)元素,并维护排序。这和top是相反的。这个方法应该只在预期得到的数组很小的情况下使用,因为所有的数据都加载到驱动程序的内存中saveAsTextFile(path)将RDD的元素作为文本文件(或文本文件集)写入本地文件系统、HDFS或任何其他hadoop支持的文件系统的给定目录中。Spark将对每个元素调用toString,将其转换为文件中的一行文本foreach(func)在RDD的每个元素上运行函数funcaggregate类似于reduce,执行聚合运算,但它可以返回具有与输入元素数据类型不同的结果。这个函数聚合每个分区的元素,然后使用给定的combine组合函数和一个中性的“零值”,对所有分区的结果进行聚合
```
针对key/value pair RDD,PySpark专门提供了一些操作。
Transformation API,包括reduceByKey()、groupByKey()、sortByKey()和join()等。
Action API,包括countByKey()、collectAsMap()等。
reduceByKey():使用reduce()函数合并每个key的值,其重复地应用于具有多个分区的同一组RDD数据时,它首先使用reduce()函数在各个分区本地执行合并,然后跨分区发送记录以准备最终结果。
aggregateByKey():PySpark的aggregateByKey()转换操作聚合每个key的值,使用给定的聚合函数和一个中性的“零值”,并为该key返回不同类型的值
combineByKey():使用一组自定义的聚合函数组合每个key的元素,在其内部combineByKey()操作会按分区合并元素。它也是一个宽依赖的操作,在最后阶段需要shuffle数据
##### 持久化RDD
要缓存RDD,常用到两个函数:cache()和persist()
> 可以使用persist()方法将一个RDD标记为持久化。之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个Action操作触发真正计算以后,才会把计算结果进行持久化。持久化后的RDD分区将会被保留在计算节点的内存中,被后面的Action操作重复使用。
##### 基于数据分区的操作
函数名函数签名描述mapPartitionsmapPartitions(f, preservesPartitioning=False)通过对该RDD的每个分区应用函数f,返回一个新的RDDmapPartitionsWithIndexmapPartitionsWithIndex(f, preservesPartitioning=False)通过对该RDD的每个分区应用函数,同时跟踪原始分区的索引,返回一个新的RDDforeachPartitionforeachPartition(f)将一个函数f应用到这个RDD的每个分区glomglom()返回通过将每个分区中的所有元素合并为一个列表而创建的RDD
1. **RDD的创建**
使用
SparkContext
的
parallelize
方法可以创建RDD:
from pyspark import SparkContext
sc = SparkContext("local","Create RDD example")
data =[1,2,3,4,5]
rdd = sc.parallelize(data)
还可以从外部数据源创建RDD,例如从HDFS、CSV文件、数据库等:
从HDFS上的文本文件创建RDD
rdd = sc.textFile("hdfs://path/to/file.txt")
1. **过滤**
使用
filter
函数可以根据条件筛选RDD中的元素:
filtered_rdd = rdd.filter(lambda x: x %2==0)# 筛选出偶数
filtered_rdd.collect()
1. **去重**
使用
distinct
函数可以去重:
distinct_rdd = rdd.distinct()# 去重,返回唯一的元素
distinct_rdd.collect()
1. **排序**
使用
sortBy
函数可以对RDD进行排序:
sorted_rdd = rdd.sortBy(lambda x: x, ascending=False)# 按降序排序
sorted_rdd.collect()
1. **分组**
使用
groupBy
函数可以根据指定的键对RDD进行分组:
grouped_rdd = rdd.groupBy(lambda x: x %2)# 按奇偶分组
grouped_rdd.collect()# 输出每个键及其对应的元素组,例如[(0, [0, 2, 4]), (1, [1, 3])]
1. **映射 (Map)**
from pyspark import SparkContext
sc = SparkContext("local","Map example")
rdd = sc.parallelize([1,2,3,4,5])
mappedRDD = rdd.map(lambda x: x *2)# [2, 4, 6, 8, 10]
mappedRDD.collect()
1. **交集 (Intersection)**
rdd1 = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize([3,4,5,6])
intersectionRDD = rdd1.intersection(rdd2)# [3, 4]
intersectionRDD.collect()
1. **并集 (Union)**
rdd1 = sc.parallelize([1,2,3])
rdd2 = sc.parallelize([3,4,5])
unionRDD = rdd1.union(rdd2)# [1, 2, 3, 4, 5]
unionRDD.collect()
1. **差集 (Difference)**
rdd1 = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize([3,4,5])
differenceRDD = rdd1.subtract(rdd2)# [1, 2]
differenceRDD.collect()
1. **连接 (Join)** 假设有两个键值对RDD:```rdd1``` 和 ```rdd2```,我们想通过键进行连接:
rdd1 = sc.parallelize([("a",1),("b",2),("c",3)])
rdd2 = sc.parallelize([("a","X"),("b","Y"),("d","Z")])
joinedRDD = rdd1.join(rdd2)# [("a", (1, "X")), ("b", (2, "Y")), ("c", (3, None))]
joinedRDD.collect()
1. **统计** 统计RDD中的元素数量:
rdd = sc.parallelize([1,2,3,4])
count = rdd.count()# 输出: 4,表示有4个元素。 print(count)
#### Spark SQL
##### 创建DataFrame对象
###### 1. 通过
load()
方法创建DataFrame
可以通过
load()
方法将HDFS上的格式化文件转换为DataFrame,
load()
默认导入的文件格式是Parquet
- 使用Parquet文件
dfUsers=spark.read.load("file:/user/hadoop/users.parquet")# 通过show()方法展示数据
dfUsers.show()
- 使用json文件
dfUsers=spark.read.format("json").load("file:/user/hadoop/users.json")# 通过show()方法展示数据
dfUsers.show()
###### 2. 通过SparkSession方式创建DataFrame
SparkSession
的创建
from pyspark.sql import SparkSession
链式表达
spark = SparkSession
.builder # 构造SparkSession对象.appName("demo")# 设置Spark应用程序名称,不设置就随机.config("spark.some.config.option","some-value")# 设置配置选项.master("local[4]")# 设置连接的Spark主机master的URL,local[4]在本地使用4核运行.getOrCreate()# 如果已存在就get,否则Create
- **调用createDataFrame()创建DataFrame**
createDataFrame(data, schema)
中的data用来指定创建DataFrame对象的数据,可以是RDD、Python的列表list或Pandas的DataFrame对象;schema用来指定DataFrame的数据模式,可以是pyspark.sql.types类型指定的字段名和字段名数据类型的列表
data =[("apple","apple pie"),("banana","banana split"),("cherry","cherry cola")]
df = spark.createDataFrame(data,["fruit","dessert"])
- **使用range(start, end, step, numPartitions)方法创建一个列名为id的DataFrame,numPartitions是分区的意思**
- **使用spark.read.***()方法从不同类型的文件中加载数据创建DataFrame**
使用spark.read.***()方法从不同类型的文件中加载数据创建DataFrame
spark.read.csv(".csv")
spark.read.json(".json")
spark.read.parquet("***.parquet")
##### DataFrame的保存
###### 1. 通过write.***()方法保存DataFrame对象
json
data.write.json("file:/home/data.json")# parquet
data.write.parequet("file/home/data.parquet")# csv
data.write.csv("file/home/data.csv")
###### 2. 通过write.format()方法保存DataFrame对象
json
data.write.format("json").save("file:/home/data.json")# parquet
data.write.format("parquet").save("file:/home/data.parquet")# csv
data.write.format("csv").save("file:/home/data.csv")
**先将DataFrame转化成RDD再保存到文件中**
data.rdd.saveAsTextFile("file:/home/hadoop/grade")
##### DataFrame的常用操作
###### 1. Row操作
1. **可使用Row类的对象创建DataFrame**```from pyspark.sql import functions as ffrom pyspark.sql import Rowrow1 = Row(name='Wang', spark=89, python=85)#创建Row类的对象print(type(row1))#查看row1的类型 <class 'pyspark.sql.types.Row'>row2 = Row(name='Li', spark=95, python=86)row3 = Row(name='Ding', spark=90, python=88)rdd = sc.parallelize([row1,row2,row3])#利用Row对象创建RDDdf = rdd.toDF()#将RDD对象转换为DataFrame,可以指定新的列名df.show()```
**重点单词
parallelize
**
1. **Row类的对象的列表直接创建DataFrame**```DataFrame2 = spark.createDataFrame([row1,row2,row3])```
2. **asDict()方法将其转换为字典对象**```row1.asDict()# {'name': 'Wang', 'spark': 89, 'python': 85}```
###### 2. Column操作
1. **alias()方法可对输出的列重命名**```df.select('name',df.spark.alias("SPARK")).show(2)#对spark列重命名```
2. **对列进行排列**```# asc()-升序,desc()-降序df.select(df.spark,df.python).orderBy(df.spark.asc()).show()```
3. **改变列的数据类型**```# 调用astype()方法改变列的数据类型。df.select(df.spark.astype('string').alias('Spark')).collect()```
4. **按条件筛选**- **betwen**```# between(low,upper)df.select(df.name, df.python.between(85,87)).show()```- **when**```# when(condition, value1).otherwise(value2)df.select(df.name,df.when(df.spark >90,1).when(df.spark <90,2).otherwise(0)).show()```- **contains**```# 过滤掉所有城市列中不包含字符串”Los”的行filtered_df = df.filter(~col("City").contains("Los"))filtered_df.show()```- **withColumn**```# withColumn()方法更改列的值、转换DataFrame中已存在的列的数据类型、添加或者创建一个新的列from pyspark.sql.functions import coldf3 = df.withColumn("python",col("python")*100)#更改列的值#使用现有列添加新列df4 = df.withColumn("PYTHON",col("python")*10)```- **substr**```# substr(startPos,length)方法获取从startPos索引下标开始,长度为length的子字符串result = df.withColumn("substring", df["text"].substr(1,3))```
###### 3. 常用属性
1. **输出**```# 1. show() show()方法输出数据,默认输出20 show(n) 输出前n条 show(truncate=false) 全部展示# 2. collect() 获取所有数据到list列表,list中每个元素是Row类型 # 3. printSchema() 可查看一个DataFrame对象中有那些列,这些列是什么样的数据类型,即打印出字段名称和类型# 4. count() gradedf.count()# 输出DataFrame对象的行数# 5.使用first()、head()、take()方法获取若干行记录(1)first()返回第一行记录 gradedf.first()(2)head()获取第一行记录,head(n)获取前n行记录 gradedf.head (2)#获取前2行记录(3)take(n)获取前n行记录 gradedf.take(2)# 6. distinct gradedf.distinct().show()# 返回一个不包含重复记录的DataFrame对象# 7. dropDuplicates gradedf.dropDuplicates(["Spark"]).show()#根据Spark字段去重```
2. **筛选**```# 1. where gradedf.where("Class ='1' and Spark = '91'").show()# 2. drop gradedf.drop("ID","Spark").show(3)# 去除ID,Spark字段# 3. select gradedf.select("Class","Name","Scala").show(3,False)#输出筛选的数据时对列进行重命名 gradedf.select("Name","Scala").withColumnRenamed("Name","NAME").show(2)# 4. alias gradedf.select("Name",gradedf.Spark.alias("spark")).show(3)# 对列重命名# 5. selectExpr# 可以直接对指定字段调用用户自定义函数,或者指定别名等。传入字符串类型Expr参数 gradedf.selectExpr("Name","Name as Names","upper(Name)","Scala*10").show(3)```
3. **排序**```# 1. orderBy()和sort()# 默认升序 gradedf.orderBy("Spark","Scala").show(5) gradedf.sort("Spark","Scala",ascending=False).show(5)# 2. sortWithinPartitions# 和上面的sort()方法功能类似,区别在于sortWithinPartitions()方法返回的是按Partition排好序的DataFrame对象。```
4. **汇总与聚合**```# 1. groupBy# groupBy()按某些字段分组 (1)结合count()方法统计每组的记录数 gradedf.groupBy("Class").count().show() (2)结合max(colNames)方法获取分组指定字段colNames的最大值 只能作用于数字型字段。 gradedf.groupBy("Class").max("Scala","Spark").show() (4)结合sum(colNames)方法获取分组指定字段的和值只能作用于数字型字段。 gradedf.groupBy("Class").sum("Spark","Scala").show() (5)结合mean(colNames)方法获取分组指定字段的平均值只能作用于数字型字段。 gradedf.groupBy("Class").mean("Spark","Scala").show()# 2. agg聚合操作# agg()可以同时对多个列进行操作,生成所需要的数据。from pyspark.sql import functions as f gradedf.agg(f.min(gradedf.Spark),f.max(gradedf.Spark)).show()```
5. **合并**```unionAll(other:Dataframe)方法用于合并两个DataFrame对象,unionAll()方法并不是按照列名和并,而是按照位置合并的,对应位置的列将合并在一起,列名不同并不影响合并,两个DataFrame对象的字段数必须相同。gradedf.select("Name","Scala","Spark").unionAll( df.select ("name","spark","python")).show()```
6. **连接**```# join(1)单字段连接 # 通过两个DataFrame对象的一个相同字段将两个DataFrame对象连接起来。 df1.join(df2,"name").show()#"name"是df1和df2相同的字段(2)多字段连接 # 通过两个DataFrame对象的多个相同字段将两个DataFrame对象连接起来。#"name"、"Java"是df1和df2相同的字段 df1.join(df2,["name","Java"]).show()(3)指定join连接类型 # inner, outer, left_outer, right_outer, leftsemi df1.join(df2,["name","Java"],"inner").show()# to系列方法主要包括toDF、toJSON、toPandas、toLocalIterator ```
##### 读写数据库
**连接**
jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/class").option("driver","com.mysql.cj.jdbc.Driver").option("dbtable","student").option("user","newuser").option("password","hadoop").load()
```
版权归原作者 吧啦吧啦! 所有, 如有侵权,请联系我们删除。