0


SparkSQL简单使用

第1关:SparkSQL初识

任务描述

本关任务:编写一个

  1. sparksql

基础程序。

相关知识

为了完成本关任务,你需要掌握:1. 什么是

  1. SparkSQL
  1. 什么是
    1. SparkSession

什么是SparkSQL

  1. Spark SQL

是用来操作结构化和半结构化数据的接口。 当每条存储记录共用已知的字段集合,数据符合此条件时,

  1. Spark SQL

就会使得针对这些数据的读取和查询变得更加简单高效。具体来说,

  1. Spark SQL

提供了以下三大功能: (1)

  1. Spark SQL

可以从各种结构化数据源(例如

  1. JSON

  1. Parquet

等)中读取数据。

(2)

  1. Spark SQL

不仅支持在

  1. Spark

程序内使用

  1. SQL

语句进行数据查询,也支持从类似商业智能软件

  1. Tableau

这样的外部工具中通过标准数据库连接器(

  1. JDBC/ODBC

)连接

  1. sparkSQL

进行查询。

(3) 当在

  1. Spark

程序内使用

  1. Spark SQL

时,

  1. Spark SQL

支持

  1. SQL

与常规的

  1. Python/Java/Scala

代码高度整合,包括连接

  1. RDD

  1. SQL

表、公开的自定义

  1. SQL

函数接口等。

什么是SparkSession

  1. Spark

中所有功能的入口点都是

  1. SparkSession

类。要创建基本的

  1. SparkSession

,只需使用

  1. SparkSession.builder()

  1. import org.apache.spark.sql.SparkSession;
  2. SparkSession spark = SparkSession
  3. .builder()
  4. .appName("Java Spark SQL基本示例")
  5. .master("local")
  6. .config("spark.some.config.option" , "some-value")
  7. .getOrCreate();
  8. //打印spark版本号
  9. System.out.println(spark.version());

编程要求

请仔细阅读右侧代码,根据方法内的提示,在

  1. Begin - End

区域内进行代码补充,具体任务如下:

  • 打印spark的版本号。

代码实现

  1. package com.educoder.bigData.sparksql;
  2. import org.apache.spark.sql.AnalysisException;
  3. import org.apache.spark.sql.Dataset;
  4. import org.apache.spark.sql.Row;
  5. import org.apache.spark.sql.SparkSession;
  6. public class Test1 {
  7. public static void main(String[] args) throws AnalysisException {
  8. /********* Begin *********/
  9. SparkSession spark = SparkSession
  10. .builder()
  11. .appName("Java Spark SQL基本示例")
  12. .master("local")
  13. .config("spark.some.config.option" , "some-value")
  14. .getOrCreate();
  15. //打印spark版本号
  16. System.out.println(spark.version());
  17. /********* End *********/
  18. }
  19. }

第2关:Dataset创建及使用

编程要求

根据提示,在右侧编辑器补充代码,读取

  1. people.json

文件,过滤

  1. age

  1. 23

的数据,并以表格形式显示前

  1. 20

  1. Dataset

代码实现

  1. package com.educoder.bigData.sparksql;
  2. import org.apache.spark.sql.AnalysisException;
  3. import org.apache.spark.sql.SparkSession;
  4. public class Test2 {
  5. public static void main(String[] args) throws AnalysisException {
  6. SparkSession spark = SparkSession
  7. .builder()
  8. .appName("test1")
  9. .master("local")
  10. .config("spark.some.config.option" , "some-value")
  11. .getOrCreate();
  12. /********* Begin *********/
  13. //读取json,并将Dataset,并注册为SQL临时视图
  14. spark.read().json("people.json").createOrReplaceTempView("people");
  15. spark.sql("select * from people where age !='23'").show();
  16. /********* End *********/
  17. }
  18. }

第3关:Dataset自定义函数

编程要求

请仔细阅读右侧代码,根据方法内的提示,在

  1. Begin - End

区域内进行代码补充,编写自定义函数类

  1. MyAverage

,用来计算用户薪水平均值,平台已提供了最后的实现:

代码实现

  1. package com.educoder.bigData.sparksql;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import org.apache.spark.sql.Row;
  5. import org.apache.spark.sql.expressions.MutableAggregationBuffer;
  6. import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
  7. import org.apache.spark.sql.types.DataType;
  8. import org.apache.spark.sql.types.DataTypes;
  9. import org.apache.spark.sql.types.StructField;
  10. import org.apache.spark.sql.types.StructType;
  11. public class MyAverage extends UserDefinedAggregateFunction {
  12. private static final long serialVersionUID = 1L;
  13. private StructType inputSchema;
  14. private StructType bufferSchema;
  15. public MyAverage() {
  16. /********* Begin *********/
  17. List<StructField> inputFields = new ArrayList<StructField>();
  18. inputFields.add(DataTypes.createStructField("inputColumn", DataTypes. LongType, true));
  19. inputSchema = DataTypes.createStructType(inputFields);
  20. List<StructField> bufferFields = new ArrayList<StructField>();
  21. bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
  22. bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
  23. bufferSchema = DataTypes.createStructType(bufferFields);
  24. /********* End *********/
  25. }
  26. @Override
  27. public StructType bufferSchema() {
  28. /********* Begin *********/
  29. //return null;
  30. return bufferSchema;
  31. /********* End *********/
  32. }
  33. @Override
  34. public DataType dataType() {
  35. /********* Begin *********/
  36. //return null;
  37. return DataTypes.DoubleType;
  38. /********* End *********/
  39. }
  40. @Override
  41. public boolean deterministic() {
  42. // TODO Auto-generated method stub
  43. return true;
  44. }
  45. @Override
  46. public Object evaluate(Row buffer) {
  47. /********* Begin *********/
  48. //return null;
  49. return ((double) buffer.getLong(0)) / buffer.getLong(1);
  50. /********* End *********/
  51. }
  52. @Override
  53. public void initialize(MutableAggregationBuffer buffer) {
  54. /********* Begin *********/
  55. buffer.update(0, 0L);
  56. buffer.update(1, 0L);
  57. /********* End *********/
  58. }
  59. @Override
  60. public StructType inputSchema() {
  61. /********* Begin *********/
  62. //return null;
  63. return inputSchema;
  64. /********* End *********/
  65. }
  66. @Override
  67. public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
  68. /********* Begin *********/
  69. long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
  70. long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
  71. buffer1.update(0, mergedSum);
  72. buffer1.update(1, mergedCount);
  73. /********* End *********/
  74. }
  75. @Override
  76. public void update(MutableAggregationBuffer buffer, Row input) {
  77. /********* Begin *********/
  78. if (!input.isNullAt(0)) {
  79. long updatedSum = buffer.getLong(0) + input.getLong(0);
  80. long updatedCount = buffer.getLong(1) + 1;
  81. buffer.update(0, updatedSum);
  82. buffer.update(1, updatedCount);
  83. }
  84. /********* End *********/
  85. }
  86. }
标签: spark scala 大数据

本文转载自: https://blog.csdn.net/qq_56710665/article/details/128386652
版权归原作者 vincc177 所有, 如有侵权,请联系我们删除。

“SparkSQL简单使用”的评论:

还没有评论