0


用Spark在大数据平台DataBricks轻松处理数据

Apache Spark是一个强大的开源分布式计算系统,专为大规模数据处理而设计。而DataBricks则提供了一个基于云的环境,使得在Spark上处理数据变得更加高效和便捷。本文将介绍如何在DataBricks平台上使用Spark轻松处理大数据。DataBricks是一个基于云的大数据处理平台,它提供了一个完整的环境,用于运行和管理Spark应用程序。DataBricks的界面用户友好,支持多种数据源,并提供了丰富的数据可视化工具。此外,DataBricks还提供了协作功能,使团队成员可以共享和协作处理数据。

如何用免费资源启动Azure DataBricks 工作区,请参照之前的一个教程:

利用 Azure Data Bricks的免费资源学习云上大数据-CSDN博客

一、使用以下 PySpark 代码将数据加载到数据帧

CSV 文件的第一行包含列名,Spark 能够根据每一列所包含的数据推断出其数据类型。 还可以指定数据的显式架构,这在数据文件中不包含列名时很有用,例如此 CSV 示例:

1、下载示例文件:

样本数据下载地址:【免费】AzureDataBricks-sample-Data资源-CSDN文库

Notebook试验文件下载地址:【免费】DataBricks-Python-Sample.ipynb资源-CSDN文库

2、上传同样本数据文件到DataBricks DBFS系统中

在Azure DataBricks新建笔记本,然后选择上传数据

将样本数据Products.csv 拖动上传到这里

3、上传notebook示例试验文件(此步骤可不做跳过)

如下图:将刚刚下载的DataBricks-Python-Sample.ipynb拖动上传

4、加载数据到数据帧

加载数据后,可以直接在笔记本粘贴下面命令,逐步复制来执行看结果

# 请用你的azure portal 登录的邮箱替换掉 [email protected]
 df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/[email protected]/products.csv")

执行结果:

二、浏览原始表中的数据

display(df1)

执行结果:

可以使用以下 PySpark 代码将数据加载到数据帧中并显示前 10 行:

# 请用你的azure portal 登录的邮箱替换掉 [email protected]
df = spark.read.load('dbfs:/FileStore/shared_uploads/[email protected]/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

执行结果:

三、指定Spark数据帧架构

以下 PySpark 示例演示了如何指定要从名为 product-data.csv 的文件加载的数据帧的架构

# 请用你的Azure 登录用户邮箱替换 [email protected] 
from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('dbfs:/FileStore/shared_uploads/[email protected]/products.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

执行结果:

四、调用sparkAPI 对数据帧进行筛选和分组

pricelist_df = df.select("ProductID", "ListPrice")

将多个方法“链接”在一起来执行一系列操作,从而生成转换后的数据帧。 例如,此示例代码将 select 和 where 方法链接在一起来创建新的数据帧,其中包含用于“山地自行车”或“公路自行车”类别的产品的 ProductName 和 ListPrice 列:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

执行结果:

要对数据进行分组和聚合,可以使用 groupBy 方法和聚合函数。 例如,以下 PySpark 代码计算每个类别的产品数量:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

执行结果:

五、在 Spark 目录中创建数据库对象

1、可以使用数据帧的 saveAsTable 方法将其保存为表。

2、可以使用 spark.catalog.createTable 方法创建空表。 表是元数据结构,该结构会将其基础数据存储在与目录关联的存储位置。 删除表也会删除其基础数据。

3、 可以使用 spark.catalog.createExternalTable 方法创建外部表。 外部表定义目录中的元数据,但从外部存储位置获取其基础数据;通常是数据湖中的文件夹。 删除外部表不会删除基础数据。

#1 
df.write.saveAsTable("products1")
#2
df.createOrReplaceTempView("products2")

六、用sql语句进行查询

1、直接用SQL语句进行查询

%sql
SELECT ProductName, ListPrice
FROM products1
WHERE Category = 'Touring Bikes';

执行结果:

2、用sql 语句汇总查询

%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products1
GROUP BY Category
ORDER BY Category

执行结果:

3、调用spark.sql 查询

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

执行结果:


本文转载自: https://blog.csdn.net/weixin_39108752/article/details/135623484
版权归原作者 AI普惠行者 所有, 如有侵权,请联系我们删除。

“用Spark在大数据平台DataBricks轻松处理数据”的评论:

还没有评论