第1关:SparkSQL初识
任务描述
本关任务:编写一个
sparksql
基础程序。
相关知识
为了完成本关任务,你需要掌握:1. 什么是
SparkSQL
- 什么是
。SparkSession
什么是SparkSQL
Spark SQL
是用来操作结构化和半结构化数据的接口。 当每条存储记录共用已知的字段集合,数据符合此条件时,
Spark SQL
就会使得针对这些数据的读取和查询变得更加简单高效。具体来说,
Spark SQL
提供了以下三大功能: (1)
Spark SQL
可以从各种结构化数据源(例如
JSON
、
Parquet
等)中读取数据。
(2)
Spark SQL
不仅支持在
Spark
程序内使用
SQL
语句进行数据查询,也支持从类似商业智能软件
Tableau
这样的外部工具中通过标准数据库连接器(
JDBC/ODBC
)连接
sparkSQL
进行查询。
(3) 当在
Spark
程序内使用
Spark SQL
时,
Spark SQL
支持
SQL
与常规的
Python/Java/Scala
代码高度整合,包括连接
RDD
与
SQL
表、公开的自定义
SQL
函数接口等。
什么是SparkSession
Spark
中所有功能的入口点都是
SparkSession
类。要创建基本的
SparkSession
,只需使用
SparkSession.builder()
。
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL基本示例")
.master("local")
.config("spark.some.config.option" , "some-value")
.getOrCreate();
//打印spark版本号
System.out.println(spark.version());
编程要求
请仔细阅读右侧代码,根据方法内的提示,在
Begin - End
区域内进行代码补充,具体任务如下:
- 打印
spark
的版本号。
代码实现
package com.educoder.bigData.sparksql;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class Test1 {
public static void main(String[] args) throws AnalysisException {
/********* Begin *********/
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL基本示例")
.master("local")
.config("spark.some.config.option" , "some-value")
.getOrCreate();
//打印spark版本号
System.out.println(spark.version());
/********* End *********/
}
}
第2关:Dataset创建及使用
编程要求
根据提示,在右侧编辑器补充代码,读取
people.json
文件,过滤
age
为
23
的数据,并以表格形式显示前
20
行
Dataset
。
代码实现
package com.educoder.bigData.sparksql;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.SparkSession;
public class Test2 {
public static void main(String[] args) throws AnalysisException {
SparkSession spark = SparkSession
.builder()
.appName("test1")
.master("local")
.config("spark.some.config.option" , "some-value")
.getOrCreate();
/********* Begin *********/
//读取json,并将Dataset,并注册为SQL临时视图
spark.read().json("people.json").createOrReplaceTempView("people");
spark.sql("select * from people where age !='23'").show();
/********* End *********/
}
}
第3关:Dataset自定义函数
编程要求
请仔细阅读右侧代码,根据方法内的提示,在
Begin - End
区域内进行代码补充,编写自定义函数类
MyAverage
,用来计算用户薪水平均值,平台已提供了最后的实现:
代码实现
package com.educoder.bigData.sparksql;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class MyAverage extends UserDefinedAggregateFunction {
private static final long serialVersionUID = 1L;
private StructType inputSchema;
private StructType bufferSchema;
public MyAverage() {
/********* Begin *********/
List<StructField> inputFields = new ArrayList<StructField>();
inputFields.add(DataTypes.createStructField("inputColumn", DataTypes. LongType, true));
inputSchema = DataTypes.createStructType(inputFields);
List<StructField> bufferFields = new ArrayList<StructField>();
bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
bufferSchema = DataTypes.createStructType(bufferFields);
/********* End *********/
}
@Override
public StructType bufferSchema() {
/********* Begin *********/
//return null;
return bufferSchema;
/********* End *********/
}
@Override
public DataType dataType() {
/********* Begin *********/
//return null;
return DataTypes.DoubleType;
/********* End *********/
}
@Override
public boolean deterministic() {
// TODO Auto-generated method stub
return true;
}
@Override
public Object evaluate(Row buffer) {
/********* Begin *********/
//return null;
return ((double) buffer.getLong(0)) / buffer.getLong(1);
/********* End *********/
}
@Override
public void initialize(MutableAggregationBuffer buffer) {
/********* Begin *********/
buffer.update(0, 0L);
buffer.update(1, 0L);
/********* End *********/
}
@Override
public StructType inputSchema() {
/********* Begin *********/
//return null;
return inputSchema;
/********* End *********/
}
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
/********* Begin *********/
long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
buffer1.update(0, mergedSum);
buffer1.update(1, mergedCount);
/********* End *********/
}
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
/********* Begin *********/
if (!input.isNullAt(0)) {
long updatedSum = buffer.getLong(0) + input.getLong(0);
long updatedCount = buffer.getLong(1) + 1;
buffer.update(0, updatedSum);
buffer.update(1, updatedCount);
}
/********* End *********/
}
}
版权归原作者 vincc177 所有, 如有侵权,请联系我们删除。