0


SparkSQL简单使用

第1关:SparkSQL初识

任务描述

本关任务:编写一个

sparksql

基础程序。

相关知识

为了完成本关任务,你需要掌握:1. 什么是

SparkSQL
  1. 什么是
    SparkSession
    

什么是SparkSQL

Spark SQL

是用来操作结构化和半结构化数据的接口。 当每条存储记录共用已知的字段集合,数据符合此条件时,

Spark SQL

就会使得针对这些数据的读取和查询变得更加简单高效。具体来说,

Spark SQL

提供了以下三大功能: (1)

Spark SQL

可以从各种结构化数据源(例如

JSON

Parquet

等)中读取数据。

(2)

Spark SQL

不仅支持在

Spark

程序内使用

SQL

语句进行数据查询,也支持从类似商业智能软件

Tableau

这样的外部工具中通过标准数据库连接器(

JDBC/ODBC

)连接

sparkSQL

进行查询。

(3) 当在

Spark

程序内使用

Spark SQL

时,

Spark SQL

支持

SQL

与常规的

Python/Java/Scala

代码高度整合,包括连接

RDD

SQL

表、公开的自定义

SQL

函数接口等。

什么是SparkSession

Spark

中所有功能的入口点都是

SparkSession

类。要创建基本的

SparkSession

,只需使用

SparkSession.builder()

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL基本示例")
.master("local")
.config("spark.some.config.option" , "some-value")
.getOrCreate();
//打印spark版本号
System.out.println(spark.version());

编程要求

请仔细阅读右侧代码,根据方法内的提示,在

Begin - End

区域内进行代码补充,具体任务如下:

  • 打印spark的版本号。

代码实现

package com.educoder.bigData.sparksql;

import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class Test1 {

    public static void main(String[] args) throws AnalysisException {
        /********* Begin *********/
        SparkSession  spark  =  SparkSession 
                  .builder()
                  .appName("Java Spark SQL基本示例")
                  .master("local")
                  .config("spark.some.config.option" , "some-value")
                  .getOrCreate();
 //打印spark版本号
 System.out.println(spark.version());

        /********* End *********/
    }

}

第2关:Dataset创建及使用

编程要求

根据提示,在右侧编辑器补充代码,读取

people.json

文件,过滤

age

23

的数据,并以表格形式显示前

20

Dataset

代码实现

package com.educoder.bigData.sparksql;

import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.SparkSession;

public class Test2 {

    public static void main(String[] args) throws AnalysisException {
        
        SparkSession  spark  =  SparkSession 
                  .builder()
                  .appName("test1")
                  .master("local")
                  .config("spark.some.config.option" , "some-value")
                  .getOrCreate();
        /********* Begin *********/
        
        //读取json,并将Dataset,并注册为SQL临时视图
        spark.read().json("people.json").createOrReplaceTempView("people");

        spark.sql("select * from people where age !='23'").show();

        /********* End *********/
    }

}

第3关:Dataset自定义函数

编程要求

请仔细阅读右侧代码,根据方法内的提示,在

Begin - End

区域内进行代码补充,编写自定义函数类

MyAverage

,用来计算用户薪水平均值,平台已提供了最后的实现:

代码实现

package com.educoder.bigData.sparksql;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class MyAverage extends UserDefinedAggregateFunction {
    
    private static final long serialVersionUID = 1L;
    
    private StructType inputSchema;
    private StructType bufferSchema;

    public MyAverage() {
        /********* Begin *********/
        List<StructField> inputFields = new ArrayList<StructField>();
        inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.       LongType, true));
        inputSchema = DataTypes.createStructType(inputFields);
        List<StructField> bufferFields = new ArrayList<StructField>();
        bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
        bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
        bufferSchema = DataTypes.createStructType(bufferFields);
        /********* End *********/
      }
    
    @Override
    public StructType bufferSchema() {
        /********* Begin *********/
        
        //return null;
        return bufferSchema;
        /********* End *********/
    }

    @Override
    public DataType dataType() {
        /********* Begin *********/
        
        //return null;
        return DataTypes.DoubleType;
        /********* End *********/
    }

    @Override
    public boolean deterministic() {
        // TODO Auto-generated method stub
        return true;
    }

    @Override
    public Object evaluate(Row buffer) {
        /********* Begin *********/
        
        //return null;
        return ((double) buffer.getLong(0)) / buffer.getLong(1);
        /********* End *********/
    }

    @Override
    public void initialize(MutableAggregationBuffer buffer) {
        /********* Begin *********/
        buffer.update(0, 0L);
        buffer.update(1, 0L);
        
        /********* End *********/    

    }

    @Override
    public StructType inputSchema() {
        /********* Begin *********/
        
        //return null;
        return inputSchema;
        /********* End *********/
    }

    @Override
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
        /********* Begin *********/
        long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
        long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
        buffer1.update(0, mergedSum);
        buffer1.update(1, mergedCount);

        
        
        /********* End *********/
    }

    @Override
    public void update(MutableAggregationBuffer buffer, Row input) {
        /********* Begin *********/
        if (!input.isNullAt(0)) {
        long updatedSum = buffer.getLong(0) + input.getLong(0);
        long updatedCount = buffer.getLong(1) + 1;
        buffer.update(0, updatedSum);
        buffer.update(1, updatedCount);
        }
        
        
        /********* End *********/

    }

}
标签: spark scala 大数据

本文转载自: https://blog.csdn.net/qq_56710665/article/details/128386652
版权归原作者 vincc177 所有, 如有侵权,请联系我们删除。

“SparkSQL简单使用”的评论:

还没有评论