0


spark实验三 Spark SQL编程初级实践

Spark SQL基本操作

将下列json数据复制到你的ubuntu系统/usr/local/spark下,并保存命名为employee.json。

{ "id":1 ,"name":" Ella","age":36 }
{ "id":2,"name":"Bob","age":29 }
{ "id":3 ,"name":"Jack","age":29 }

首先为employee.json创建DataFrame,并写出Python语句完成下列操作:

创建DataFrame

答案:

spark=SparkSession.builder().getOrCreate()

df = spark.read.json("file:///usr/local/spark/employee.json")

  1. 查询DataFrame的所有数据

答案:>>> df.show()

  1. 查询所有数据,并去除重复的数据

答案:>>> df.distinct().show()

  1. 查询所有数据,打印时去除id字段

答案:>>> df.drop("id").show()

  1. 筛选age>20的记录

答案:>>> df.filter(df.age > 30 ).show()

  1. 将数据按name分组

答案:>>> df.groupBy("name").count().show()

  1. 将数据按name升序排列

答案:>>> df.sort(df.name.asc()).show()

取出前3行数据

答案:>>> df.take(3) 或python> df.head(3)

查询所有记录的name列,并为其取别名为username

答案:>>> df.select(df.name.alias("username")).show()

查询年龄age的平均值

答案:>>> df.agg({"age": "mean"}).show()

查询年龄age的最大值

答案:>>> df.agg({"age": "max"}).show()

编程实现将RDD转换为DataFrame

方法一:利用反射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换;

方法二:使用编程接口,构造一个schema并将其应用在已知的RDD上。

练习

编写独立应用程序实现,将RDD转换为DataFrame,分别使用反射机制和编程方式,打印出记录的信息

1,Ella,36

2,Bob,29

3,Jack,29

#/usr/local/spark
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import Row
from pyspark.sql import SQLContext
if __name__ == "__main__":
        sc = SparkContext("local","Simple App")
        spark=SparkSession(sc)
        peopleRDD = spark.sparkContext.textFile("file:/usr/local/spark/employee.txt")
        rowRDD = peopleRDD.map(lambda line : line.split(",")).map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2]))).toDF()
        rowRDD.createOrReplaceTempView("employee")
        personsDF = spark.sql("select * from employee")
        personsDF.rdd.map(lambda t : "id:"+str(t[0])+","+"Name:"+t[1]+","+"age:"+str(t[2])).foreach(print)

编写独立应用程序实现,配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入下列数据到MySQL,打印出记录的信息、age的最大值和age的总和(数据记录共3行,红色字体第三行为你的真实学号、姓名、姓别、年纪)。

employee表****新增数据

id

name

gender

age

1

Mary

F

26

2

Tom

M

23

id

name

M

10

create database sparktest;

#/usr/local/spark/mysqltest.py
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

schema = StructType([StructField("id",LongType(),True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
employeeRDD = spark.sparkContext.parallelize(["3 Mary F 26","4 Tom M 23","id name M 10"]).map(lambda x:x.split(" "))
rowRDD = employeeRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))

employeeDF = spark.createDataFrame(rowRDD, schema)
prop = {}
prop['user'] = 'root'
prop['password'] = '123456'
prop['driver'] = "com.mysql.jdbc.Driver"
employeeDF.write.jdbc("jdbc:mysql://localhost:3306/employee?useSSL=false",'employee','append', prop)
employeeDF.collect()
employeeDF.agg({"age": "max"}).show()
employeeDF.agg({"age": "sum"}).show()

标签: spark sql 大数据

本文转载自: https://blog.csdn.net/2202_75334392/article/details/136478976
版权归原作者 又迷茫了 所有, 如有侵权,请联系我们删除。

“spark实验三 Spark SQL编程初级实践”的评论:

还没有评论