Spark SQL基本操作
将下列json数据复制到你的ubuntu系统/usr/local/spark下,并保存命名为employee.json。
{ "id":1 ,"name":" Ella","age":36 }
{ "id":2,"name":"Bob","age":29 }
{ "id":3 ,"name":"Jack","age":29 }
首先为employee.json创建DataFrame,并写出Python语句完成下列操作:
创建DataFrame
答案:
spark=SparkSession.builder().getOrCreate()
df = spark.read.json("file:///usr/local/spark/employee.json")
查询DataFrame的所有数据
答案:>>> df.show()
查询所有数据,并去除重复的数据
答案:>>> df.distinct().show()
查询所有数据,打印时去除id字段
答案:>>> df.drop("id").show()
筛选age>20的记录
答案:>>> df.filter(df.age > 30 ).show()
将数据按name分组
答案:>>> df.groupBy("name").count().show()
将数据按name升序排列
答案:>>> df.sort(df.name.asc()).show()
取出前3行数据
答案:>>> df.take(3) 或python> df.head(3)
查询所有记录的name列,并为其取别名为username
答案:>>> df.select(df.name.alias("username")).show()
查询年龄age的平均值
答案:>>> df.agg({"age": "mean"}).show()
查询年龄age的最大值
答案:>>> df.agg({"age": "max"}).show()
编程实现将RDD转换为DataFrame
方法一:利用反射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换;
方法二:使用编程接口,构造一个schema并将其应用在已知的RDD上。
练习
编写独立应用程序实现,将RDD转换为DataFrame,分别使用反射机制和编程方式,打印出记录的信息
1,Ella,36
2,Bob,29
3,Jack,29
#/usr/local/spark
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import Row
from pyspark.sql import SQLContext
if __name__ == "__main__":
sc = SparkContext("local","Simple App")
spark=SparkSession(sc)
peopleRDD = spark.sparkContext.textFile("file:/usr/local/spark/employee.txt")
rowRDD = peopleRDD.map(lambda line : line.split(",")).map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2]))).toDF()
rowRDD.createOrReplaceTempView("employee")
personsDF = spark.sql("select * from employee")
personsDF.rdd.map(lambda t : "id:"+str(t[0])+","+"Name:"+t[1]+","+"age:"+str(t[2])).foreach(print)
编写独立应用程序实现,配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入下列数据到MySQL,打印出记录的信息、age的最大值和age的总和(数据记录共3行,红色字体第三行为你的真实学号、姓名、姓别、年纪)。
employee表****新增数据
id
name
gender
age
1
Mary
F
26
2
Tom
M
23
id
name
M
10
create database sparktest;
#/usr/local/spark/mysqltest.py
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
schema = StructType([StructField("id",LongType(),True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
employeeRDD = spark.sparkContext.parallelize(["3 Mary F 26","4 Tom M 23","id name M 10"]).map(lambda x:x.split(" "))
rowRDD = employeeRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
employeeDF = spark.createDataFrame(rowRDD, schema)
prop = {}
prop['user'] = 'root'
prop['password'] = '123456'
prop['driver'] = "com.mysql.jdbc.Driver"
employeeDF.write.jdbc("jdbc:mysql://localhost:3306/employee?useSSL=false",'employee','append', prop)
employeeDF.collect()
employeeDF.agg({"age": "max"}).show()
employeeDF.agg({"age": "sum"}).show()
版权归原作者 又迷茫了 所有, 如有侵权,请联系我们删除。