pyspark官方文档: https://spark.apache.org/docs/latest/api/python/index.html
pyspark案例教程: https://sparkbyexamples.com/pyspark-tutorial/
1. 写在前面
这篇文章记录下最近学习的有关Pyspark以及用spark sql去处理大规模数据的一些常用语法,之前总觉得pandas是做数据分析和数据挖掘的利器, 但是工作之后,面对海量数据(上亿规模),这才发现,普通的pandas几乎毫无用武之力,所以有必要再重新探索下pyspark了,学校的时候也接触了些,但大部分都是关于环境搭建相关的皮毛,对于做数据处理,数据分析等是一点都没有深入,所以工作之后,打算重新去挖一个有关大数据开发的坑,然后从日常使用以及底层原理方面慢慢去填补。
这段时间,也正好利用pyspark的spark dataframe在做一些数据分析和处理工作,所以结合这段时间的使用,整理下常用的一些语法,方便以后回看回练,后面有关于spark的dataframe的操作,集中整合过来。和git一样, 这次依然是采用需求加处理方式的风格,能真正的做到学以致用。
主要内容:
- Pyspark小简介
- RDD篇
- DataFrame篇
Ok, let’s go!
1. Pyspark简介
1.1 背景小识
Pyspark是能在Spark上能运行python程序的一个库(Spark的Python API),有了它,就能在分布式集群上并行运行python程序。
Apache Spark是用于大规模的强大分布式数据处理和机器学习应用程序的分析处理引擎。
原本上Spark是Scala写的,但后来工业上python用的很大,就基于了Py4J发行了python的使用版本,这是一个集成在PySpark中的Java包,允许python自动的与JVM对象对接,所以运行Pyspark,也需要Java和spark。这也就是为啥运行pyspark程序的时候,需要先指定java home、spark home以及py4j包位置所在路径的原因。比如,我之前运行pyspark程序的一个配置:
import os
import sys # sys.path是python的搜索模块的路径集,是一个list
os.environ['SPARK_HOME']='/usr/local/spark'
sys.path.append('/usr/local/spark/python')
sys.path.append('/home/hduser/anaconda3/envs/bigdata_env/bin/python3.5')
sys.path.append('/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip')
优势: 核心就是快,毕竟能用集群,可并行,基于内存操作,容错,惰性
- 分布式处理数据, 快
- 能很方便的处理云存储文件系统的数据,比如HDFS, AWS, S3
- 也能用Streaming和Kafka处理实时数据
- 使用Pyspark流,还可以从文件系统流式传输文件,也可以从套接字流传输。
- Pyspark也有一些机器学习和图形库
pyspark架构:
master-slave架构(dirver-worker), 运行spark程序的时候, spark driver创建一个上下文(程序入口),具体的操作(transformations和actions)是在worker节点上执行,资源被集群管理者统一管理。
1.2 基础组件
关于pyspark的安装,我在学校的时候整理过,这里就写了,这里先一览pyspark的组件和包,从宏观上看看pyspark到底有啥东西。
1.2.1 pyspark RDD
Pyspark的基础数据结构,容错,不可变的分布式对象集合,一旦创建不可改变。RDD中的每个数据集都分为逻辑分区(可以在集群的不同节点上计算)。
创建RDD, 首先需要创建SparkSession(Pyspark应用程序的入口)
# Import SparkSessionfrom pyspark.sql import SparkSession
# Create SparkSession
spark = SparkSession.builder \
.master("local[1]") \
.appName("SparkByExamples.com") \
.getOrCreate()# 另外一种写法 这种写法可以通过指定conf来设置spark的运行模式(local,cluster)# local[*]和上面的这个都是本地模式, 这时候会读取spark/conf下面的default.spark_conf# 所以如果需要做一些配置的话,比如设置excutor的内存大小等,可以在这个spark_conf下面加
conf = SparkConf().setAppName('test').setMaster("local[*]")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = SparkContext
SparkSession在内部创建了一个SparkContext变量,我们能创建多个SparkSession对象,但是每个虚拟机上只能有一个SparkContext,如果想创建另外一个SparkContext,必须用
stop()
方法停掉存在的,否则会报错SparkContext已经存在,这个在jupyter notebook中运行代码的时候容易出现。
创建RDD可以从列表中创建,也可以从一些指定的数据源,也可以从spark的DataFrame转。
# Create RDD from parallelize base a list
dataList =[("Java",20000),("Python",100000),("Scala",3000)]
rdd=spark.sparkContext.parallelize(dataList)# Create RDD from external Data source
rdd2 = spark.sparkContext.textFile("/path/test.txt")
rdd3 = spark_df.rdd
RDD的操作:
- Transformations: 惰性操作,运行一个transformation的时候,其实是返回了另外一个RDD,没有具体的运行,只是将操作记录了下来。 比如
flatMap()、map()、reduceByKey()、fliter()、sortByKey()
等 - Actions: 这时候才开始在计算,返回结果给driver。 比如
count()、collect()、first()、max()、reduce()
等
这个给我的感觉很像tensorflow1.x里面的静态图机制, Transformations就类似于那个建图过程,而Actions就类似于开启Sessions然后具体执行。
1.2.2 Pyspark DataFrame
这里的DataFrame是一个分布式的数据结合,类似于pandas的数据表,但功能更丰富。可以通过结构化数据,hive表,外部数据库或者存在的RDD创建。
PySpark DataFrame与Pandas DataFrame非常相似,而不同点PySpark DataFrames分布在群集中(意味着DataFrame中的数据存储在群集中的不同机器中),并且Pyspark中的任何操作都在所有计算机上并行执行,而Panda DataFrame存储和操作都在一台机器。
关于创建方式,在后面DataFrame那一节会详细整理。DataFrame操作,也是分为了Transformations和Actions。
1.2.3 Pyspark SQL
这个非常常用,可以直接写sql语句来处理DataFrame数据。但操作DataFrame之前,需要先把要操作的DataFrame注册成一个临时表
createTempView()
。
df.createTempView("PERSON_DATA")
df2 = spark.sql("SELECT * from PERSON_DATA")
df2.printSchema()
df2.show()
groupDF = spark.sql("SELECT gender, count(*) from PERSON_DATA group by gender")
groupDF.show()# 不用的时候,一定要删除掉临时表,否则临时表重名了就会报错
spark.catalog.dropTempView("PERSON_DATA")
1.2.4 其他组件
其他的这几个用的不多,简单简介。
- PysparkStreaming这是一个可扩展的,高通量,容错性的流处理系统,支持batch和工作流。用于处理实时数据。 交互:
# 从tcp socket中读取流数据df = spark.readStream .format("socket").option("host","localhost").option("port","9090").load()# kafka中读取流数据df = spark.readStream .format("kafka").option("kafka.bootstrap.servers","192.168.1.100:9092").option("subscribe","json_topic").option("startingOffsets","earliest")// From starting .load()# 写入数据到kafkadf.selectExpr("CAST(id AS STRING) AS key","to_json(struct(*)) AS value").writeStream .format("kafka").outputMode("append").option("kafka.bootstrap.servers","192.168.1.100:9092").option("topic","josn_data_topic").start().awaitTermination()
- Pyspark MLlib 这是一个机器学习库,能够在spark上跑一些简单的机器学习模型。这个目前接触的不是很多,后面有需求再回来补上。
- Pyspark GraphFrames 图形计算库
1.3 SparkSession和SparkContext
1.3.1 SparkSession
SparkSession是Pyspark的入口,是使用RDD、DataFrame或者Dataset之前必须先走的步骤。
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \ # 集群模式 local[x]的x要大于0,表示用RDD或者Dataframe的时候,有多少partitions, 理想情况是CPU核数个.appName('SparkByExamples.com') \ # 应用名称.getOrCreate()# 如果有SparkSession就返回,没有就创建新的# Usage of config()
spark = SparkSession.builder \
.master("local[1]") \
.appName("SparkByExamples.com") \
.config("spark.some.config.option","config-value") \ # 这里可以设置spark的配置,或让他读取配置文件.getOrCreate()# Set Config
spark.conf.set("spark.executor.memory","5g")# Get a Spark Config
partitions = spark.conf.get("spark.sql.shuffle.partitions")print(partitions)# Enabling Hive to use in Spark 能使用Hive
spark = SparkSession.builder \
.master("local[1]") \
.appName("SparkByExamples.com") \
.config("spark.sql.warehouse.dir","<path>/spark-warehouse") \
.enableHiveSupport() \
.getOrCreate()
如果想用集群模式,master要指定一个集群的地址,并且集群里面已经完成spark的各种环境搭建。
import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径import sys # sys.path是python的搜索模块的路径集,是一个list
os.environ['JAVA_HOME']='/opt/bigdata/java/jdk1.8'
os.environ['SPARK_HOME']='/opt/bigdata/spark/spark2.2'
os.environ['PYSPARK_PYTHON']='/opt/bigdata/anaconda3/envs/bigdata_env/bin/python3.7'
os.environ['PYSPARK_DRIVER_PYTHON']='/opt/bigdata/anaconda3/envs/bigdata_env/bin/python3.7'
sys.path.append('/opt/bigdata/spark/spark2.2/python')
sys.path.append('/opt/bigdata/spark/spark2.2/python/lib/py4j-0.10.4-src.zip')
sys.path.append('/opt/bigdata/anaconda3/envs/bigdata_env/bin/python3.7')# spark 配置信息from pyspark import SparkConf
from pyspark.sql import SparkSession
SPARK_APP_NAME ="ALSRecommend"
SPARK_URL ="spark://192.168.56.101:7077"# 这个不要写错
conf = SparkConf()# 创建spark config对象
config =(("spark.app.name", SPARK_APP_NAME),# 设置启动的spark的app名称,没有提供,将随机产生一个名称("spark.executor.memory","6g"),# 设置该app启动时占用的内存用量,默认1g("spark.master", SPARK_URL),# spark master的地址("spark.executor.cores","4"),# 设置spark executor使用的CPU核心数# 以下三项配置,可以控制执行器数量# ("spark.dynamicAllocation.enabled", True),# ("spark.dynamicAllocation.initialExecutors", 1), # 1个执行器# ("spark.shuffle.service.enabled", True)# ('spark.sql.pivotMaxValues', '99999'), # 当需要pivot DF,且值很多时,需要修改,默认是10000)# 查看更详细配置及说明:https://spark.apache.org/docs/latest/configuration.html
conf.setAll(config)# 利用config对象,创建spark session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
1.3.2 SparkContext
SparkContext是pyspark的功能切入点,用于与集群通信,创建RDD,累加器和广播变量。每个JVM上只能创建一个。
Spark Driver程序创建并使用SparkContext连接到群集管理器以提交Pyspark作业,并知道与哪个资源管理器(YARN,MESOS或Standalone)进行通信。这是Pyspark应用程序的核心。
# Create SparkSession from builderfrom pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
.appName('SparkByExamples.com') \
.getOrCreate()print(spark.sparkContext)print("Spark App Name : "+ spark.sparkContext.appName)# Outputs#<SparkContext master=local[1] appName=SparkByExamples.com>#Spark App Name : SparkByExamples.com# SparkContext stop() method
spark.sparkContext.stop()# 还可以直接创建# Create Spark Contextfrom pyspark import SparkConf, SparkContext
conf = SparkConf()
conf.setMaster("local").setAppName("Spark Example App")
sc = SparkContext.getOrCreate(conf)print(sc.appName)
通过这一顿操作,后面才能正常用spark做操作。
2. RDD篇
RDD(弹性分布式数据集Resilient Distributed Dataset)是pyspark的基础块,容错,不可变,RDD里面的每条记录被分到了逻辑分区,即可以在不同集群节点上进行运算了。
如果说,pyspark里面的DataFrame与pandas类似,那么这个东西就很像python里面的列表,不同的依然是RDD是可以在多台机器分布式计算,而后者只能在一个进程里面做处理。
RDD提供了数据如何跑在多台机器上的数据抽象分区,并且对用户使用来讲,背后的通过分区,把数据分发到多台机器并行计算完汇总得到结果的过程是个黑盒子,用就完事。
特点:
- 基于内存处理,这是与mapreduce的区别
- 一旦创建,不可变,当在RDD上应用transforamtions操作, pyspark创建的是新的RDD
- 有容错机制,会自动恢复和重试
- 惰性机制,transformation的时候只记录操作,不实际执行,当遇到actions的时候才开始执行
- 创建RDD的时候,会分区,默认是可用核数
Pyspark RDD不适合对状态存储进行更新的应用程序,例如Web应用程序的存储系统。对于这些应用程序,使用执行传统更新日志和数据检查点的系统(例如数据库)更有效。RDD的目的是为批处理分析提供有效的编程模型和保留异步应用程序。
创建方法:已经存在的集合或者是外部存储系统(HDFS)
2.1 RDD创建
创建方法:已经存在的集合或者是外部存储系统(HDFS)
#Create RDD from parallelize 如上图
data =[1,2,3,4,5,6,7,8,9,10,11,12]
rdd=spark.sparkContext.parallelize(data, partitionNums)# 后面可以指定分区数量#Create RDD from external Data source
rdd2 = spark.sparkContext.textFile("/path/textFile.txt")# 上面这两个方法创建RDD的时候, 会自动把数据划分到分区上, 下面这个可以看分区数量print("initial partition count:"+str(rdd.getNumPartitions()))#Outputs: initial partition count:2
2.2 RDD分区
RDD可是根据我们的指定重新分区,pyspark提供了两种方法: repartition和coalesce
- repartition: 把数据根据指定的分区数重新分区, 全量数据的shuffle操作
- coalesce: 改动最小的shuffle,比如现在4个分区,执行coalease(2),此时把两个分区的数据移动到量另外两个上面,而repartition的话,是先全量混合然后重新打散成2个
rdd1.saveAsTextFile("c://tmp/partition2")
rdd2 = rdd1.repartition(4)print("Repartition size : "+str(rdd2.getNumPartitions()))
rdd2.saveAsTextFile("c://tmp/re-partition2")
rdd3 = rdd1.coalesce(4)print("Repartition size : "+str(rdd3.getNumPartitions()))
rdd3.saveAsTextFile("c:/tmp/coalesce2")
对于DataFrame,同样也有这两种分区操作,使用方法上和这个基本上一致
df2 = df.repartition(6)
df3 = df.coalesce(2)
这里需要注意一个点,**在DataFrame上调用
groupBy(),union()
和
join()
等类似函数的时候,会导致重新shffule数据分区**,默认情况下,会重分区成200个partitions,也可以根据spark的一个配置spark.sql.shuffle.partitions调整。
2.3 RDD的操作
上面提到过, RDD主要包括两种操作:
- Transformations: 惰性操作,运行一个transformation的时候,其实是返回了另外一个RDD,没有具体的运行,只是将操作记录了下来。 比如
flatMap()、map()、reduceByKey()、fliter()、sortByKey()
等 - Actions: 这时候才开始在计算,返回结果给driver。 比如
count()、collect()、first()、max()、reduce()
等
这里比较经典的案例,那当属workcount了, 这个可以剖析到每一步到底上面这些操作在干啥,知道原理之后, 我后面在整理个在实际场景中常用的一个需求以及剖析下重要的细节部分。
ok, 下面白嫖下资本家的搭建好的Jupyter Notebook, 快速玩一下上面的这些操作。
2.3.1 workcount
这里先用一个最简单的workcount任务,介绍下上面这些操作的原理, 没有txt,简单写点话做个例子吧。
from pyspark.sql import SparkSession
# 创建spark会话,并获取spark的山下文
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
# 从一个字符串列表创建rdd出来,背后就是把这三句话分区,放到了不同的机器上面
words =["I love me","This is a start for me about pyspark","pyspark is interesting and helpful for me"]
rdd = sc.parallelize(words)# workcount: 第一步, 首先利用flatMap操作, 对每句话操作# 对每句话, 要根据空格拆分成多个单词返回回来, 也就是输入是一条记录,返回的时候成了多条记录,这种操作用flatMap
rdd1 = rdd.flatMap(lambda x: x.split(" "))# workcount: 第二步, map操作,对每个单词初始化计数器, map是输入一条记录,返回一条记录
rdd2 = rdd1.map(lambda x:(x,1))# workcount: 第三步, reduceByKey操作, 对每个键的值,按照指定函数操作,下面这个就是相同key的值相加统计个数
rdd3 = rdd2.reduceByKey(lambda a, b: a + b)# workcount: 第四步, 按照个数从小到大对单词排序
rdd4 = rdd3.map(lambda x:(x[1], x[0])).sortByKey()# 只保留个数多于1的单词
rdd5 = rdd4.filter(lambda x : x[0]>1)
这一套tranformations的组合拳下来,就完成了单词的词频统计,下面就是用actions上真正执行,然后输出了。
print(rdd5.collect())
这样就统计出来了:
这里也可以统计个数大于2的单词一共多少个:
rdd5.count()# 4
rdd5.first()# 返回第一个
rdd5.take(3)# 返回前3个
rdd5.reduce(lambda x, y:(x[0]+y[0],))# 统计一共多少个单词
2.3.2 真实场景
那么, 对于rdd的transformations的这些运算,在真实场景中一般会有啥需求,又怎么操作呢?
场景: 建立数据仓库的时候, 我有上万个parquet文件,每个parquet文件里面, 又会有万条记录,那么此时,我应该如何读取这些数据,然后做一些数据分析,处理或者写入到Hive表里面呢?
这个其实在大数据情况下非常常见, 这个逻辑其实比较简单,利用上面学习到的内容, 几行代码就可以搞定,先分析一下。
分析: 有上万个parquet文件,这些文件我们先放到一个path_list列表里面, 然后对于每个path,我们去相应的path下面去读取parquet数据,这个会返回万条记录的列表。 而不同的path, 可以分配到不同机器上完成这个操作,然后把最终记录汇总合并,得到上亿条记录。 对于每条记录,预处理下,然后转成spark DataFrame就可以分布式挖掘和分析,存储等。
所以这里其实是大数据读取里面一个非常重要的范式, 非常通用,翻译成代码如下:
# 这里需要Row对象,这里对应着dataframe的一条记录,可以提前定义好列from pyspark.sql import Row
import pyarrow.parquet as pq
# 定义下DataFrame的列的名称
data_row = Row('name','age','sex','address','phone')# 给定一个parquet的文件路径,返回下面的所有记录defread_parquet_data(parquet_path):# 把parquet文件转成pd的dataframe
df = pq.read_table(parquet_path).to_pandas()
data = json.loads(df.to_json(orient="records"))
res =[]for i in data:ifnot i:continue
res.append(data_row(i.get("name"), i.get("age"), i.get("sex"), i.get("address"), i.get('phone')))return res
# 处理每条记录, 传入的是一个row对象defprocess(item):
new_item = Row('name','age','sex','address','phone','class')# 可以通过item.name, item.age, item.sex等获取原始数据,这里可以做一些操作if item.phone.startwith('178'):class=1else:class=2return new_item(item.name, item.age, item.sex, item.address, item.phone, item.class)# 下面这个是核心,这里的map也可以换成mapPartitions
raw_data_rdd = sc.parallelize(parquet_path_list).flatMap(read_parquet_data).map(process)
df = spark.createDataFrame(raw_data_rdd)# 如果rdd的列结构类型复杂,这里推断不出来就需要手动指定schema# 下面就可以基于dataframe做一些数据处理分析等了。
注意: 这里process中接受的是一个Row对象,是不能直接通过
item.属性名
去改变值的,如果想改属性名的值,这里需要先把Row对象转成字典,这个其实也很常用。
defprocess_row(row):# 转成字典
d = row.as_Dict()# 对各个特征一顿处理
d['a']=
d['b']=return Row(**d)
spark.createDataFrame(df.rdd.map(process_row))
2.3.3 剖析一个小细节
rdd里面的操作,非常常用的就是map, flatMap以及mapPartitions, 这三个有啥区别呢? 这里简单再总结下:
- map: 对RDD中的每条记录进行操作,返回的也是一条记录,即接收一个row对象,数据处理,返回一个row对象(值)
- mapPartitions: 这个和map的处理方式一样,但是它是对分区的迭代器进行操作,如果是普通的map,比如一个partition中有1万条数据。那么function要执行和计算1万次。使用mapPartitions操作之后,一个task仅仅会执行一次function,function一次接收所有的partition数据。只要执行一次就可以了,性能比较高。缺点是后者容易在partition数据量大的时候,容易爆内存。
- flatMap:这个和上面两个的区别就是flat, 这个会把结果展平,通俗的就是对RDD的每条记录操作,但是返回的是多条记录,即接收一个row对象,返回一个row对象的list。
2.4 广播变量
最后整理下RDD里面的广播,这个应用场景就是,比如要基于某个映射,处理数据的每条记录,并把某列值映射成另一种值,往往可以使用广播变量的方式,把字典广播到每个excutor上面去,可以加快执行效率,减少了通信。
场景: 有上亿条记录,我想把其中的某列数据的国家简称换成全称
这个直接拿官方的例子:
# 先定义一个转换规则的字典
states ={"NY":"New York","CA":"California","FL":"Florida"}# 声明成广播变量# 不加这一行其实也可以做,只不过那样excutor就需要和master通信
broadcastStates = spark.sparkContext.broadcast(states)
data =[("James","Smith","USA","CA"),("Michael","Rose","USA","NY"),("Robert","Williams","USA","CA"),("Maria","Jones","USA","FL")]
rdd = spark.sparkContext.parallelize(data)defstate_convert(code):return broadcastStates.value[code]
result = rdd.map(lambda x:(x[0],x[1],x[2],state_convert(x[3]))).collect()
3. DataFrame篇
3.1 DataFrame创建
首先就是创建DataFrame,我这里大致上梳理三种常用方式。
3.1.1 RDD创建
基于RDD创建的时候,一般会常用两个函数
toDF()
和
createDataFrame()
# 从rdd创建
df = rdd.toDF()# 自己推断schema, 此时schema _1, _2, _3这样的形式# 指定schema
columns =["language","users_count"]
dfFromRDD1 = rdd.toDF(columns)
dfFromRDD1.printSchema()# createDataFrame
dfFromRDD2 = spark.createDataFrame(rdd).toDF(*columns)
创建的时候指定schema, 很常用
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 =[("James","","Smith","36636","M",3000),("Michael","Rose","","40288","M",4000),("Robert","","Williams","42114","M",4000),("Maria","Anne","Jones","39192","F",4000),("Jen","Mary","Brown","","F",-1)]
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("middlename",StringType(),True), \
StructField("lastname",StringType(),True), \
StructField("id", StringType(),True), \
StructField("gender", StringType(),True), \
StructField("salary", IntegerType(),True) \
])
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)
schema的意思,就是要指明DataFrame的列名,以及是啥类型,是否允许为空等,因为spark的DataFrame和pandas的DataFrame在存储上是不同的,所以这里需要指定好schema之后,spark才知道如何存这些列。 对于简单的数据类型, spark的DataFrame还能推断,但是复杂的话,必须得指定schema。
3.1.2 各种数据源读取
这个也非常常用,毕竟对于大数据,总不能自己先写个RDD列表吧。 不过上面读取数据的范式里面,已经写了一个读parquet数据,得到rdd然后转成DataFrame的样例了,下面就是从常见的文件格式中读取数据得到DataFrame。
# csv, textg, json, xml, sql读表
df2 = spark.read.text("/src/resources/file.txt")
df2 = spark.read.json("/src/resources/file.json")
df2 = spark.sql("select * from table where xxx is xxx")
df2 = spark.read.csv("/src/resources/file.csv")# csv的另外写法
df2 = spark.read.format("csv").load("src/resouces/file.csv")# 还可以加一些选项
df2 = spark.read.option("header"=True, delimiter=',', inferSchema='True') \
.csv("/tmp/resources/zipcodes.csv")# 这里面还有很多选项, quotes, nullValues, dataFormat等# 下面这个也很常用
schema = StructType() \
.add("RecordNumber",IntegerType(),True) \
.add("Zipcode",IntegerType(),True) \
.add("ZipCodeType",StringType(),True) \
.add("City",StringType(),True) \
.add("State",StringType(),True) \
.add("Notes",StringType(),True)
df_with_schema = spark.read.format("csv") \
.option("header",True) \
.schema(schema) \
.load("/tmp/resources/zipcodes.csv")
3.1.3 互转
这里的互转,包括RDD与Spark的DF, pandas的DF与Spark的DF
# rdd 与 spark df
df = rdd.toDF(schema=xxx)
df = spark. createDataFrame(rdd, schema=xxx)
rdd = df.rdd
# pandas df 与 spark df
pandf_val = pandf.values.tolist()
pandf_col =list(pandf.columns)
spark_df = spark.createDataFrame(pandf_val, schema=pandf_col)
pandf = spark.df.toPandas()# 注意,数据量不宜太大,否则会爆内存
3.1.4 StructType & StructField
StructType&StructField类用于编程为数据指定框架,并创建复杂的列(例如嵌套结构,数组和map列)
StructField定义DataFrame的列类型,StructType是一个StructField的集合, 常用的数据列类型,常用的嵌套结构:
structureData =[(("James","","Smith"),"36636","M",3100),(("Michael","Rose",""),"40288","M",4300),(("Robert","","Williams"),"42114","M",1400),(("Maria","Anne","Jones"),"39192","F",5500),(("Jen","Mary","Brown"),"","F",-1)]
structureSchema = StructType([
StructField('name', StructType([
StructField('firstname', StringType(),True),
StructField('middlename', StringType(),True),
StructField('lastname', StringType(),True)])),
StructField('id', StringType(),True),
StructField('gender', StringType(),True),
StructField('salary', IntegerType(),True)])
df2 = spark.createDataFrame(data=structureData,schema=structureSchema)
df2.printSchema()
df2.show(truncate=False)
注意: ArrayType或者MapType, 也要为每个元素指定类型
arrayStructureSchema = StructType([
StructField('name', StructType([
StructField('firstname', StringType(),True),
StructField('middlename', StringType(),True),
StructField('lastname', StringType(),True)])),
StructField('hobbies', ArrayType(StringType()),True),
StructField('properties', MapType(StringType(),StringType()),True)])# 更复杂
types.StructField("properties",
types.ArrayType(
types.StructType([
types.StructField("property_name", types.StringType(),False),
types.StructField("property_value", types.StringType(),False),])),False,),
types.StructField("info",
types.StructType([
types.StructField("task", types.StringType(),False),
types.StructField("operator", types.StringType(),False),
types.StructField("date", types.StringType(),False),]),False,),
所以,要想创建复杂的DataFrame结构,这种复合嵌套类型有时候还是比较头疼的。
3. 2 DataFrame操作
这就是重头戏了, DataFrame的操作才是大数据处理的精髓,由于各种骚操作众多,先整理常用的,然后慢慢学习,慢慢补充。
3.2.1 基础查
宏观的说,DataFrame中也会分为Transformation和action两种操作:
拿到一个dataframe,我们和pandas一样,需要先看看他有哪些列,多少数据, 没列是什么类型等,所以下面这几个函数非常常用:
# 显示数据结构, 每一列的数据类型等
df.printSchema()# 显示前n条数据
df.show(n, truncate=False)# 统计总量
df.count()# 列名
df.columns
# 提取部分列 select,可以提取单列,多列, 按照索引提取列等
df.select('SepalLength','SepalWidth').show()from pyspark.sql.functions import col
df.select(col("firstname"),col("lastname")).show()
df.select(df.colRegex("`^.*name*`")).show()# 正则表达式
df.select("*").show()# 所有列# 类型是map的
df2.select("name.firstname","name.lastname").show(truncate=False)# 统计信息 describe
df.describe().show()#计算某一列的描述信息
df.describe('cls').show()# distinct
df.select('cls').distinct().count()# 类似pandas的nunique
df.select('cls').distinct().collect()# pandas的unique
有了这些,就能差不多知道DataFame的数据量级,规模,有哪些列等,然后做数据处理的时候,首先的一个小窍门就是看能不能通过一些条件,先过滤出用到的数据以及列等,把数据规模降下去,因为大部分场景下,其实用不到所有数据的。
3.2.2 列操作
3.2.2.1 根据列名做简单操作
这里整理常用的对列操作的函数
# 对列起别名alias
df.select(df.fname.alias("first_name"), df.lname.alias("last_name")).show()# 修改列的类型cast
df.select(df.fname,df.id.cast("int")).printSchema()#排序 asc, desc
df.sort(df.fname.asc()).show()
df.sort(df.fname.desc()).show()# 根据列过滤行 filter
df.filter(df.id.between(100,300)).show()# 这个很常用, 把大数据先按照条件过滤掉一些df.filter(() & () & ())
df.filter(df.fname.contains("Cruise")).show()
df.filter(df.fname.startswith("T")).show()
df.filter(df.fname.endswith("Cruise")).show()
df.filter(df.id.between(100,300)).show()# 看某列是否有空 isNULL
df.filter(df.lname.isNull()).show()
df.filter(df.lname.isNotNull()).show()# 模糊匹配#like , rlike
df.select(df.fname,df.lname,df.id).filter(df.fname.like("%om"))# 子集,包含
f.select(df.fname.substr(1,2).alias("substr")).show()
li=["100","200"]
df.select(df.fname,df.lname,df.id).filter(~df.id.isin(li)).show()
testDF.select('cls').subtract(trainDF.select('cls'))#删除一列
df.drop('cls').show()
3.2.2.2 withColumn操纵
这也是操作列的一个大杀器, 出场率极高, 是一个用于改变列值,增加新列,修改列的类型等的一个transformation 操作。
import pyspark.sql.functions as F # pyspark的内置函数包# 修改列的类型 cast
df.withColumn("salary",F.col("salary").cast("Integer")).show()# 通过已经有的列更新列的值,简单版,复杂的得需要自定义函数
df.withColumn("salary",F.col("salary")*100).show()# 增加新列
df.withColumn("CopiedColumn",F.col("salary")*-1).show()
df.withColumn("Country", F.lit("USA")).show()# lit函数用来用常量来作为列的默认值# 修改列名
df.withColumnRenamed("gender","sex").show(truncate=False)
3.2.3 行操作
关于行操作,这里第一点就是,不像pandas的DataFrame,spark的DataFrame不能直接iloc这样的索引去拿行里面的每条记录,因为这哥俩内部存储不一样,后者并不是把整个结构表完整存储。 sparkDataFrame这里只能是show,完事collect()转成列表,才可以通过索引去拿。
3.2.3.1 去重采样排序
行这里在DataFrame上,常用的会是去重操作和排序显示,采样等:
# 最直接的去重
distinctDF = df.distinct()
df2 = df.dropDuplicates()# 还可以指定列值去重
dropDisDF = df.dropDuplicates(["department","salary"])# 排序显示orderBy
df.sort(df.department.asc(),df.state.desc()).show(truncate=False)
df.sort(col("department").asc(),col("state").desc()).show(truncate=False)
df.orderBy(col("department").asc(),col("state").desc()).show(truncate=False)# 采样sample(withReplacement, fraction, seed=None) 是否包含重复样本, 抽样比例, seed# 可以从大的数据集中随即采样一个子集
df.sample(True,0.2,2022)# 分成抽样sampleBy -> sampleBy(col, fractions, seed=None)
df2.sampleBy("key",{0:0.1,1:0.2},0).collect())
3.2.3.2 填充与透视
填充缺失值, 这里有两个函数,结果是一样的:
df.fillna(value, subset=None)# df.fillna(value="")
df.nan.fill(value, subset=None)# df.na.fill(value=0,subset=["population"])
df.na.fill("unknown",["city"]).na.fill("",["type"]).show()
pivot函数将数据的一个列的列值转成多个列,然后进行一些聚合统计的操作,生成透视表。
这里需要一个例子来看,假设我的dataFrame长这样:
我想统计下,对于每个产品, 各个国家的生产总量。就可以根据产品分组,然后把国家这一列透视,聚合Amount
df.groupBy('Product').pivot('Country').sum('Amount').show()# 如果想看每个国家对于每种产品的生产总价值
df.groupBy('Country').pivot('Product').sum('Amount').show()
结果如下:
那么,如果想把这个操作返回去呢? 可以用unpivot的操作, 这里用了sql里面的stack操作,相当于把上面的按照行的方向,一个个的堆叠起来, 这里可以选择自己想要的国家列
from pyspark.sql.functions import expr
unpivotExpr ="stack(3, 'Canada', Canada, 'China', China, 'Mexico', Mexico) as (Country,Total)"
unPivotDF = pivotDF.select("Product", expr(unpivotExpr)) \
.where("Total is not null")
unPivotDF.show(truncate=False)
结果如下:
3.2.3.3 map与flatMap范式
如果转成rdd的话,这里常用的依然是map, flatMap操作, 去处理。这里根据一个实际场景,整理两个范式
场景: 假设我有一个dataframe,每条记录是用户每天玩手机的时间段,有开始和结束时间, 我现在感觉这个时间段的粒度太粗了,想切分成1分钟为单位的时间段,这样后面可以和其他的表做join得到更多的信息,怎么做?
这里的第一个范式flatMap:
df = spark.createDataFrame([('zhangsan','kuaishou',198379875385798,09989842198934),('lisi','douyin',928734817893784,397817587835),('wangwu','bzhan',209187901389,0189378974831)],['name','app','start','end')# 时间切割函数import typing
deflower(timestamp:int, step:int):return timestamp // step * step
defupper(timestamp:int, step:int):if timestamp % step ==0:return timestamp
return timestamp // step * step + step
defslice_time_range(start:int, end:int, step:int)-> typing.List[int]:if start == end and start % step ==0:return[start, start + step]
new_start = lower(start, step)
new_end = upper(end, step)returnlist(range(new_start, new_end+1, step))defslice_row(row):
_list =[]
start, end = row['start'], row['end']
time_slice_list = slice_time_range(start, end, step=60*10**9)iflen(time_slice_list)>1:
temp =[row['name', row['app'], time_slice_list[0], time_slice_list[1]]
_list.append(temp)return _list
slice_df = df.rdd.flatMap(slice_row).toDF(schema=df.schema)# 只是范式,如果slice_row这里处理最终返回一条记录,那就是map操作
这样,就把大的时间片段切割成了更细粒度的片段了。更新粒度有什么好处呢? 可以让数据处理变得更加灵活,比如有一个表是用户玩某app的时间列表,另一个表记录了玩另一些app的时间列表,如果想把这俩表jion发现时间段对不上怎么办? 起止和终止时间不一样咋办? 这时候,就可以采用“合久必分,分久必合”的思路, 先把各自的时间段切割成更细粒度,比如1s,然后肯定就有重合了,拼接起来之后,再把1s的时间片段合并起来得到一段时间。 所以这里的第二个范式就是,把上面的slice_df再合并起来。
# 时间合并defg(df):
sort_df = df.sort_values('start')
result_list =[]
start_idx =0while start_idx < sort_df.shape[0]:
item_dict =dict(sort_df.iloc[start_idx])
end_idx = start_idx +1while end_idx < sort_df.shape[0]and sort_df.iloc[end_idx]['start']- sort_df.iloc[
end_idx -1]['end']<=60*10**9:
end_idx +=1
item_dict['end']= sort_df.iloc[end_idx -1]['end']
result_list.append(item_dict)
start_idx = end_idx
result = pd.DataFrame(result_list)return result
df = slice_df.groupby('name').applyInPandas(g, slice_df.schema)
3.2.4 分组聚合
3.2.4.1 常用分组操作
这个又是一个很重要的操作。大数据分析中经常用到。GroupBy函数使用一些列值对数据分组, 并将每组数据进行聚合得到分组聚合后的结果,常用于统计。
df.groupby('cls').agg({'SepalWidth':'mean','SepalLength':'max'}).show()# avg(), count(), countDistinct(), first(), kurtosis(),# max(), mean(), min(), skewness(), stddev(), stddev_pop(),# stddev_samp(), sum(), sumDistinct(), var_pop(), var_samp() and variance()
df.groupBy("department").count()
df.groupBy("department") \
.agg(sum("salary").alias("sum_salary"), \
avg("salary").alias("avg_salary"), \
sum("bonus").alias("sum_bonus"), \
max("bonus").alias("max_bonus")) \
.where(col("sum_bonus")>=50000) \
.show(truncate=False)
上面是一些统计性的函数,这里还可以结合一些内置函数库里面的函数做一些聚合操作,比如非常常用的,就是根据某列分组,把另一列的值聚合成一个list。
aggregation_df = df.groupby('key_column1','key_column2').agg(
F.last("xxx").alias("xxx"),
F.last("xxx").alias("xxx"),
F.collect_list("xxx").alias("xxx"),)
再就是一些更高级的操作,定义一些自定义函数,然后groupby之后,用applyInPandas对每一组进行操作,类似上面的那个范式。
3.2.4.2 细节剖析
这里整理下,groupby的聚合函数的工作原理, 这里需要对rdd进行重新shuffle的。
具有相同的键的数据, 会放到一个分区上被分组, 一个分区上可以有多个key的group组。这个过程就是shuffle操作完成, 这样相同键的数据在一个分区上之后,就可以用聚合函数进行计算,返回结果了。
3.2.5 dataFrame合并
3.2.5.1 join连接
join连接这里, 就是两个或者是多个DataFrame根据某些字段进行join, 常用的是inner和left
# 中间这个这样写, 两个DataFrame相同的列就会保留一个
merge_df = left_df.join(right_df,['name','id',...],'left')# 如果是下面这种写法, 会把join的列都保留,好处就是两个DataFrame join的列名可以不同
merge_df = left_df.join(right_df,[left_df.name = right_df.name1],'inner')# 还有自连接
empDF.alias("emp1").join(empDF.alias("emp2"), \
col("emp1.superior_emp_id")== col("emp2.emp_id"),"inner") \
.select(col("emp1.emp_id"),col("emp1.name"), \
col("emp2.emp_id").alias("superior_emp_id"), \
col("emp2.name").alias("superior_emp_name")) \
.show(truncate=False)
3.2.5.2 Union操作
用于merge两个或者多个具有相同Schema的DataFrame。其实就是行拼接。
unionDF = df.union(df2).distinct()
3.2.6 自定义函数
这个功能就非常的强大了,可以自定义一些函数,去处理DataFrame的一些列,我一般喜欢的一个操作,就是自定义函数,基于DataFrame的已经有的一些列,去增加一些新列。 这个在pandas的DataFrame就是
apply(lambda x: process(x), axis=1)
的这种操作。
from pyspark.sql.functions import udf, F
@udf(returnType=StringType())defget_new_column(col1, col2, col3..):# 根据传入的列做一些复杂运算return res
df.withColumn("Cureated Name", upperCase(F.col("col1"), F.col("col2"), F.col("col3"))).show(truncate=False)# 如果想处理所有列生成新的列的话,可以考虑mapdeffunc1(x):
firstName=x.firstname
lastName=x.lastname
name=firstName+","+lastName
gender=x.gender.lower()
salary=x.salary*2return(name,gender,salary)
df.rdd.map(lambda x: func1(x)).toDF(schema)
3.2.7 内置函数库
pyspark的内置函数库也是很强大的,这里整理一些目前用到的内置函数, 这些函数都在
pyspark.sql.functions
中。
3.2.7.1 与数据相关
- when: 类似sql里面的case when, then的表达,或者是switch…if then else表达
from pyspark.sql.functions import whendf2 = df.withColumn("new_gender", when(df.gender =="M","Male").when(df.gender =="F","Female").when(df.gender.isNull(),"").otherwise(df.gender))
这个可以直接用sql里面的case when代替from pyspark.sql.functions import expr#Using Case When on withColumn()df3 = df.withColumn("new_gender", expr("CASE WHEN gender = 'M' THEN 'Male' "+"WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''"+"ELSE gender END"))
在实际场景中,我这里有一个使用when的需求,就是分组统计给定列里面的非0值的个数, 这个当时卡了我好久,最后才写出了一个代码,记录一下。defcount_non_zero(df, features, grouping):return df.groupBy(grouping).agg(*[F.count(F.when(F.col(c)!=0,1)).alias(f"{c}_no_zero_count")for c in features])
- expr 这个函数的参数是一个sql语法字符串,可以直接解析sql,比如上面这个操作,还有一个就是两行字符串拼接得到新的列
from pyspark.sql.functions import exprdf.withColumn("Name",expr(" col1 ||','|| col2")).show()df.select("increment",expr("cast(increment as string) as str_increment"))df.filter(expr("col1 == col2")).show()
- lit 增加新列的时候, 给新列赋予常量的默认值
from pyspark.sql.functions import when, lit, coldf2 = df.select(col("EmpId"),col("Salary"),lit("1").alias("lit_value1"))df3 = df2.withColumn("lit_value2", when(col("Salary")>=40000& col("Salary")<=50000,lit("100")).otherwise(lit("200")))
- split 这个用于分割字符串列生成新的列,列类型是列表
df2 = df.select(split(col("name"),",").alias("NameArray")).drop("name")
反操作concat_ws, 把列表类型的列,用字符串起来得到一个字符串, 类似于','.join([])``````df2 = df.withColumn("languagesAtSchool", concat_ws(",",col("languagesAtSchool")))
- explode 这个函数, 用于列表类型的数据,会根据每个元素展开成行
+-------+-----------------------------------+|name |subjects |+-------+-----------------------------------+|James |[[Java, Scala, C++],[Spark, Java]]||Michael|[[Spark, Java, C++],[Spark, Java]]||Robert |[[CSharp, VB],[Spark, Python]]|+-------+-----------------------------------+# 如果想把这些学科,展开成行,可以先用flatten函数, 把这种二维列表展平成一维,然后再用explode函数df.select(df.name,flatten(df.subjects)).show(truncate=False)df.select(df.name,explode(df.subjects)).show(truncate=False)+-------+--------------+|name |subjects |+-------+--------------+|James |Java ||James |Scala ||James |C++|+-------+--------------+
3.2.7.2 与时间相关
3.2.7.3 聚合统计相关
3.2.7.4 json相关
3.2.8 DataFrame写入
写入这里,先记录一个partitionBy函数,这个是写入数据的时候,可以指定分区,在数据湖的存储中分区可以高效检索,所谓分区,其实就是根据指定的列,把数据分目录存储,这样查询的时候,只需要查对应目录中小范围数据,而不是全量查询,大数据情况下,可以大大提高检索效率。比如写入csv时,可以指定分区
#partitionBy()
df.write.option("header",True) \
.partitionBy("state") \
.mode("overwrite") \
.csv("/tmp/zipcodes-state")
这样存储的时候,就不是把所有数据存到一个文件了,而是根据starte的取值个数,存成了好多个文件。
这样检索的时候,如果指定starte=AZ,那么只拿出AZ这个文件的数据来就行,就不用全量找了。 所以一般分区的时候,会把记录的主键字段,作为分区字段。
写入这里,可以csv, parquet和json等,但我现在一般用不到写入数据,所以先不整理这块。
4. 小总
这篇文章,主要是整理目前自己用到的在pyspark下用到的数据处理的一些操作,操作很多,不需要记,而是想日后方便查阅,因为之前这块知识都是零零散散不成体系,导致每次用到还得现查百度, 浪费掉很多时间,所以索性拿个周末,把这些东西整理到一块,以后再遇到新的,再补充。
版权归原作者 翻滚的小@强 所有, 如有侵权,请联系我们删除。