0


Spark SQL编程初级实践

参考链接

Spark编程: Spark SQL基本操作 2020.11.01_df.agg("age"->"avg")-CSDN博客

RDD编程初级实践-CSDN博客

Spark和Hadoop的安装-CSDN博客

Spark SQL 编程初级实践-CSDN博客

1. Spark SQL基本操作

{ "id":1 , "name":" Ella" , "age":36 }
{ "id":2, "name":"Bob","age":29 }
{ "id":3 , "name":"Jack","age":29 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":5 , "name":"Damon" }
{ "id":5 , "name":"Damon" }

创建employee.json文件

sudo vim employee.json
cat employee.json

启动spark-shell

cd /usr/local/spark/
./bin/spark-shell

1.1 查询所有数据

import spark.implicits._
val df=spark.read.json("file:home/hadoop/下载/employee.json")
df.show()

import spark.implicits._是Spark的一个工具,帮助 我们将RDD 转换为DataFrame。

spark.read.json是 Apache Spark 中的一个方法,用于从 JSON 文件中读取数据并将其加载到 DataFrame 中。

df.show()用于显示DataFrame中的内容。

1.2 查询所有数据,并去除重复的数据

df.distinct().show()

** distinct()去重。**

1.3 查询所有数据,打印时去除id字段

df.drop(df("id")).show()

** df.drop()用于删除DataFrame中指定的列。**

1.4 筛选出age>30的记录

df.filter(df("age")>30).show()

** df.filter()用于根据指定条件过滤DataFrame中的行。**

1.5 将数据按age分组

df.groupBy(df("age")).count.show()

df.groupBy()用于根据指定的列对DataFrame进行分组。

df.count().show()用于显示分组后的DataFrame的内容。

1.6 将数据按name升序排列

df.sort(df("name").asc).show()

df.sort()用于对DataFrame中的行进行排序(默认升序)。

升序asc

降序desc

这里“Ella”比“Bob”小是因为“Ella”字符串实际上是“ Ella”,所以他的第一个字符不是‘E’而是‘ ’,对应的ASCII,‘E’是69,‘B’是66,‘ ’是32.

1.7 取出前3行数据

df.show(3)

** df.show(n)用于显示DataFrame的前n行。(n超出后会打印原始的大小)**

1.8 查询所有记录的name列,并为其取别名为username

df.select(df("name").as("username")).show()

** df.select()用于选择DataFrame中指定的列。**

1.9 查询年龄age的平均值

df.agg("age"->"avg").show()

** df.agg()用于对DataFrame进行聚合操作。**

avg平均。

1.10 查询年龄age的最小值

df.agg("age"->"min").show()

min最小。

2.编程实现将RDD转换为DataFrame

1,Ella,36
2,Bob,29
3,Jack,29

创建项目

sudo mkdir -p /example/sparkapp6/src/main/scala
cd /example/sparkapp6/src/main/scala

** 创建employee.txt**

sudo vim employee.txt 
cat employee.txt

创建Scala文件

sudo vim ./SimpleApp.scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .appName("MyApp")
      .getOrCreate()
    val peopleRDD = spark.sparkContext.textFile("file:///example/sparkapp6/src/main/scala/employee.txt")
    val schemaString = "id name age"
    val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)
    val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim, attributes(2).trim))
    val peopleDF = spark.createDataFrame(rowRDD, schema)
    peopleDF.createOrReplaceTempView("people")
    val results = spark.sql("SELECT id,name,age FROM people")
    results.map(attributes => "id: " + attributes(0) + "," + "name:" + attributes(1) + "," + "age:" + attributes(2)).show()
  }
}

这个代码没成功,继续往下面看。

** 创建.sbt文件**

sudo vim build.sbt

**这里需要的依赖发生了变化,不改会报错。 **

name := "Simple Project"
version := "1.0"
scalaVersion := "2.13.13"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.1"

** 打包执行**

/usr/local/sbt/sbt package
spark-submit --class "SimpleApp" ./target/scala-2.13/simple-project_2.13-1.0.jar

直接启动spark-shell(成功运行是看这里)

cd /usr/local/spark/
./bin/spark-shell
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val peopleRDD = spark.sparkContext.textFile("file:///example/sparkapp6/src/main/scala/employee.txt")
val schemaString = "id name age"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim, attributes(2).trim))
val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF.createOrReplaceTempView("people")
val results = spark.sql("SELECT id,name,age FROM people")
results.map(attributes => "id: " + attributes(0) + "," + "name:" + attributes(1) + "," + "age:" + attributes(2)).show()
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val peopleRDD = spark.sparkContext.textFile("file:///example/sparkapp6/src/main/scala/employee.txt")
val peopleRDD: org.apache.spark.rdd.RDD[String] = file:///example/sparkapp6/src/main/scala/employee.txt MapPartitionsRDD[10] at textFile at <console>:1

scala> val schemaString = "id name age"
val schemaString: String = id name age

scala> val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
val fields: Array[org.apache.spark.sql.types.StructField] = Array(StructField(id,StringType,true), StructField(name,StringType,true), StructField(age,StringType,true))

scala> val schema = StructType(fields)
val schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,StringType,true),StructField(name,StringType,true),StructField(age,StringType,true))

scala> val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim, attributes(2).trim))
val rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[12] at map at <console>:1

scala> val peopleDF = spark.createDataFrame(rowRDD, schema)
val peopleDF: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]

scala> peopleDF.createOrReplaceTempView("people")

scala> val results = spark.sql("SELECT id,name,age FROM people")
val results: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]

scala> results.map(attributes => "id: " + attributes(0) + "," + "name:" + attributes(1) + "," + "age:" + attributes(2)).show()
+--------------------+
|               value|
+--------------------+
|id: 1,name:Ella,a...|
|id: 2,name:Bob,ag...|
|id: 3,name:Jack,a...|
+--------------------+

3.编程实现利用DataFrame读写MySQL的数据

安装MySQL

MySQL :: Download MySQL Connector/J (Archived Versions)

sudo tar -zxf ./mysql-connector-java-5.1.40.tar.gz -C /usr/local
cd /usr/local/
mv mysql-connector-java-5.1.40/ mysql

3.1 在MySQL数据库中新建数据库sparktest,再创建表employee

su root
service mysql start
mysql -u root -p

** 建库**

create database sparktest;

建表

use sparktest;
create table employee(id int(4),name char(20),gender char(4),Age int(4)); 
mysql> use sparktest;
Database changed
mysql> create table employee(id int(4),name char(20),gender char(4),Age int(4)); 
Query OK, 0 rows affected, 2 warnings (0.02 sec)

插入数据

insert into employee values(1,'Alice','F',22);
insert into employee values(2,'John','M',25);
select * from employee;
mysql> insert into employee values(1,'Alice','F',22);
Query OK, 1 row affected (0.01 sec)

mysql> insert into employee values(2,'John','M',25);
Query OK, 1 row affected (0.01 sec)

mysql> select * from employee;

+------+-------+--------+------+
| id   | name  | gender | Age  |
+------+-------+--------+------+
|    1 | Alice | F      |   22 |
|    2 | John  | M      |   25 |
+------+-------+--------+------+
2 rows in set (0.00 sec)

3.2 配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入

标签: spark sql 数据库

本文转载自: https://blog.csdn.net/weixin_64066303/article/details/138357140
版权归原作者 封奚泽优 所有, 如有侵权,请联系我们删除。

“Spark SQL编程初级实践”的评论:

还没有评论