0


实验三 Spark SQL基础编程

实验三 Spark SQL基础编程

1.实验目的

  1. 掌握 Spark SQL 的基本编程方法;

  2. 熟悉 RDD 到 DataFrame 的转化方法;

  3. 熟悉利用 Spark SQL 管理来自不同数据源的数据。

2.实验内容

1.Spark SQL 基本操作 将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。

{ "id":1, "name":"Ella", "age":36 }

{ "id":2, "name":"Bob", "age":29 }

{ "id":3, "name":"Jack", "age":29 }

{ "id":4, "name":"Jim", "age":28 }

{ "id":4, "name":"Jim", "age":28 }

{ "id":5, "name":"Damon" }

{ "id":5, "name":"Damon" }

为 employee.json 创建 DataFrame,并写出 Python 语句完成下列操作:

(1)查询所有数据;

(2)查询所有数据,并去除重复的数据;

(3)查询所有数据,打印时去除 id 字段;

(4)筛选出 age>30 的记录;

(5)将数据按 age 分组;

(6)将数据按 name 升序排列;

(7)取出前 3 行数据;

(8)查询所有记录的 name 列,并为其取别名为 username;

(9)查询年龄 age 的平均值;

(10)查询年龄 age 的最小值。

创建json文件

echo '{ "id":1, "name":"Ella", "age":36 }' > employee.json
echo '{ "id":2, "name":"Bob", "age":29 }' >> employee.json
echo '{ "id":3, "name":"Jack", "age":29 }' >> employee.json
echo '{ "id":4, "name":"Jim", "age":28 }' >> employee.json
echo '{ "id":4, "name":"Jim", "age":28 }' >> employee.json
echo '{ "id":5, "name":"Damon" }' >> employee.json
echo '{ "id":5, "name":"Damon" }' >> employee.json

参考代码:关键代码如下。

#导入
....
//创建sprak对象
....

df = spark.read.json("employee.json")

df.show()

df_distinct = df.distinct()
df_distinct.show()

df_without_id = df.select("name", "age")
df_without_id.show()

df_age_gt_30 = df.filter(col("age") > 30)
df_age_gt_30.show()

df_grouped_by_age = df.groupBy("age").count()
df_grouped_by_age.show()

df_sorted_by_name = df.orderBy("name")
df_sorted_by_name.show()
df_top_3 = df.limit(3)
df_top_3.show()

df_with_username = df.select(col("name").alias("username"))
df_with_username.show()

print(df.select(avg("age")).first()[0])

print(df.select(min("age")).first()[-3] )

//关闭
...

2.编程实现将 RDD 转换为 DataFrame 源文件内容如下(包含 id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

请将数据复制保存到 Linux 系统中,命名为 employee.txt,实现从 RDD 转换得到 DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。

请写出 程序代码。

关键代码如下:

# 创建SparkSession
.......

# 定义源文件的结构类型

.......
# 读取源文件并创建RDD
....

# 将RDD转换为DataFrame
df = spark.createDataFrame(rdd, schema)

# 打印DataFrame的所有数据
df.show(truncate=False)

# 将DataFrame的数据按指定格式打印出来
df_string = df.rdd \
    .map(lambda row: f"id:{row['id']},name:{row['name']},age:{row['age']}") \
    .collect()

for data in df_string:
    print(data)

.......

**3. 编程实现利用 DataFrame 读写 MySQL **

(1)在 MySQL 数据库中新建数据库 sparktest,再创建表 employee,包含如表 1 所示 的两行数据

表 1 employee

(2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入如表 2 所 示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。

表 2 employee

sql语句:

CREATE DATABASE sparktest;

USE sparktest;

CREATE TABLE employee (
  id INT PRIMARY KEY,
  name VARCHAR(50),
  gender CHAR(1),
  age INT
);

INSERT INTO employee (id, name, gender, age) VALUES
(1, 'Alice', 'F', 22),
(2, 'John', 'M', 25);

python代码

from pyspark.sql import SparkSession

# 创建SparkSession
spark = SparkSession.builder \
    .appName("MySQL Example") \
    .getOrCreate()

# 定义MySQL连接信息
mysql_host = "localhost"
mysql_port = "3306"
mysql_database = "sparktest"
mysql_table = "employee"
mysql_username = "root"

mysql_password = "root"

# 创建DataFrame
data = [("3", "Mary", "F", 26), ("4", "Tom", "M", 23)]
columns = ["id", "name", "gender", "age"]
df = spark.createDataFrame(data, columns)

# 写入MySQL数据库
df.write.format("jdbc") \
    .option("url", f"jdbc:mysql://{mysql_host}:{mysql_port}/{mysql_database}&useSSL=false") \
    .option("dbtable", mysql_table) \
    .option("user", mysql_username) \
    .option("password", mysql_password) \
    .mode("append") \
    .save()

# 从MySQL数据库读取数据
df_mysql = spark.read.format("jdbc") \
    .option("url", f"jdbc:mysql://{mysql_host}:{mysql_port}/{mysql_database}") \
    .option("dbtable", mysql_table) \
    .option("user", mysql_username) \
    .option("password", mysql_password) \
    .load()

# 计算age的最大值和总和
max_age = df_mysql.selectExpr("max(age)").collect()[0][0]
sum_age = df_mysql.selectExpr("sum(age)").collect()[0][0]

# 打印结果
print("Max Age:", max_age)
print("Sum of Age:", sum_age)

# 关闭SparkSession
spark.stop()

3.参考代码

https://download.csdn.net/download/weixin_41957626/87780630

标签: spark sql 大数据

本文转载自: https://blog.csdn.net/weixin_41957626/article/details/130661798
版权归原作者 简单点了 所有, 如有侵权,请联系我们删除。

“实验三 Spark SQL基础编程”的评论:

还没有评论