0


在Kotlin中使用Spark SQL的UDF和UDAF函数

1. 项目结构与依赖

1.1 项目依赖

使用gradle:

在项目的build.gradle.kts添加

  1. dependencies {
  2. implementation("org.apache.spark:spark-sql_2.12:3.3.1")
  3. }

使用maven:

在模块的pom.xml中添加

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-sql_2.12</artifactId>
  4. <version>3.3.1</version>
  5. </dependency>

2. UDF的使用与实现

UDF,即用户自定义函数,允许用户在SQL查询中使用自定义的函数。下面案例做了一个简单的案例,将首字母变为大写。

2.1 数据源

准备数据源使用JSON数据作为数据格式,保存到项目的根路径下的data/user.txt文件。

  1. {"name":"zhangsan","age":19,"gender":"boy"}
  2. {"name":"lisi","age":20,"gender":"boy"}
  3. {"name":"wangwu","age":21,"gender":"boy"}
  4. {"name":"zhaoliu","age":22,"gender":"boy"}
  5. {"name":"sunqi","age":23,"gender":"boy"}
  6. {"name":"zhouba","age":24,"gender":"boy"}
  7. {"name":"wujiu","age":25,"gender":"boy"}
  8. {"name":"zhengshi","age":26,"gender":"boy"}

2.2 代码示例

  1. import org.apache.spark.sql.SparkSession
  2. import org.apache.spark.sql.api.java.UDF1
  3. import org.apache.spark.sql.types.DataTypes
  4. import java.util.*
  5. class SparkSQL_UDF {
  6. fun f1() {
  7. val sparkSession = SparkSession.builder()
  8. .master("local")
  9. .appName("Kotlin Spark UDF")
  10. .orCreate
  11. // 读取JSON数据并创建视图
  12. sparkSession.read().json("data/user.txt")
  13. .createOrReplaceTempView("user")
  14. // 注册UDF函数,将名字的首字母大写
  15. sparkSession.udf().register("nameHeaderUpper",
  16. UDF1 { name: String ->
  17. name.substring(0, 1).uppercase(Locale.getDefault()) + name.substring(1)
  18. },
  19. DataTypes.StringType)
  20. // 使用注册的UDF函数进行SQL查询
  21. sparkSession.sql("select nameHeaderUpper(name) as name from user").show()
  22. sparkSession.stop()
  23. }
  24. companion object {
  25. @JvmStatic
  26. fun main(args: Array<String>) {
  27. SparkSQL_UDF().f1()
  28. }
  29. }
  30. }
  31. /*输出结果
  32. +---------------------+
  33. |nameHeaderUpper(name)|
  34. +---------------------+
  35. | Zhangsan|
  36. | Lisi|
  37. | Wangwu|
  38. | Zhaoliu|
  39. | Sunqi|
  40. | Zhouba|
  41. | Wujiu|
  42. | Zhengshi|
  43. +---------------------+
  44. */

2.3 代码解析

1. 创建SparkSession:

使用local模式进行测试

  1. val sparkSession = SparkSession.builder()
  2. .master("local")
  3. .appName("Kotlin Spark UDF")
  4. .orCreate
2. 读取数据并创建视图:

创建名为user的视图

  1. sparkSession.read().json("data/user.txt")
  2. .createOrReplaceTempView("user")
3. 注册UDF函数:

使用kotlin的lambda表达式来简化UDF函数的创建 要注意这里是Java中的UDF1{}而不是Scala中的Function1{}

  1. sparkSession.udf().register("nameHeaderUpper",
  2. UDF1 { name: String ->
  3. name.substring(0, 1).uppercase(Locale.getDefault()) + name.substring(1)
  4. },
  5. DataTypes.StringType)
4. 执行SQL查询:

用sparkSession对象调用sql方法进行查询 并将结果展示到控制台

  1. sparkSession.sql("select nameHeaderUpper(name) as name from user").show()
5. 停止SparkSession:

释放资源
调用close和stop方法都可以

  1. sparkSession.stop()

3. UDAF的使用与实现

UDAF,即用户自定义聚合函数,允许用户定义复杂的聚合逻辑,如求平均值、总和等。

下面是一个简单案例来实现求年龄的平均值

3.1 代码示例

  1. import org.apache.spark.sql.Encoder
  2. import org.apache.spark.sql.Encoders
  3. import org.apache.spark.sql.SparkSession
  4. import org.apache.spark.sql.expressions.Aggregator
  5. import org.apache.spark.sql.functions
  6. import java.io.Serializable
  7. class SparkSQL_UDAF {
  8. fun f1() {
  9. val sparkSession = SparkSession.builder()
  10. .master("local")
  11. .appName("Kotlin Spark UDAF")
  12. .orCreate
  13. // 定义聚合器
  14. val agg = object : Aggregator<Long, Buffer, Long>(){
  15. override fun reduce(b: Buffer?, a: Long?): Buffer {
  16. val updatedBuffer = b ?: Buffer(0, 0)
  17. updatedBuffer.cnt++
  18. updatedBuffer.count += a!!
  19. return updatedBuffer
  20. }
  21. override fun outputEncoder(): Encoder<Long> {
  22. return Encoders.LONG()
  23. }
  24. override fun zero(): Buffer {
  25. return Buffer(0L, 0L)
  26. }
  27. override fun bufferEncoder(): Encoder<Buffer> {
  28. return Encoders.bean(Buffer::class.java)
  29. }
  30. override fun finish(reduction: Buffer?): Long {
  31. return reduction?.count?.div(reduction.cnt) ?: 0L
  32. }
  33. override fun merge(b1: Buffer?, b2: Buffer?): Buffer {
  34. return Buffer(
  35. count = (b1?.count ?: 0) + (b2?.count ?: 0),
  36. cnt = (b1?.cnt ?: 0) + (b2?.cnt ?: 0)
  37. )
  38. }
  39. }
  40. // 注册UDAF函数
  41. sparkSession.udf().register("avgAge", functions.udaf(agg, Encoders.LONG()))
  42. // 使用注册的UDAF函数进行SQL查询
  43. sparkSession.read().json("data/user.txt").createOrReplaceTempView("user")
  44. sparkSession.sql("select avgAge(age) as avg_age from user").show()
  45. sparkSession.stop()
  46. }
  47. companion object {
  48. @JvmStatic
  49. fun main(args: Array<String>) {
  50. SparkSQL_UDAF().f1()
  51. }
  52. }
  53. }
  54. data class Buffer(var count: Long, var cnt: Long) : Serializable {
  55. constructor() : this(0, 0)
  56. }
  57. /*输出结果
  58. +-----------+
  59. |avgage(age)|
  60. +-----------+
  61. | 22|
  62. +-----------+
  63. */

3.2 代码解析

1. Buffer数据类:

用来缓存聚合的中间过程的类 这里不能使用Scala中的二元组和kotlin中的Pair类 因为都不能修改其中的数据 而且需注意 需要有空参构造 (Kotlin中的data class 默认没有空参构造)

  1. data class Buffer(var count: Long, var cnt: Long) : Serializable {
  2. constructor() : this(0, 0)
  3. }
2. 定义UDAF函数:

为了看着清晰 创建了一个匿名内部类对象agg 继承自Aggregator 需要定义输入输出和缓存的泛型

实现其对应的方法

  1. val agg = object : Aggregator<Long, Buffer, Long>(){
  2. override fun reduce(b: Buffer?, a: Long?): Buffer {
  3. val updatedBuffer = b ?: Buffer(0, 0)
  4. updatedBuffer.cnt++
  5. updatedBuffer.count += a!!
  6. return updatedBuffer
  7. }
  8. override fun outputEncoder(): Encoder<Long> {
  9. return Encoders.LONG()
  10. }
  11. override fun zero(): Buffer {
  12. return Buffer(0L, 0L)
  13. }
  14. override fun bufferEncoder(): Encoder<Buffer> {
  15. return Encoders.bean(Buffer::class.java)
  16. }
  17. override fun finish(reduction: Buffer?): Long {
  18. return reduction?.count?.div(reduction.cnt) ?: 0L
  19. }
  20. override fun merge(b1: Buffer?, b2: Buffer?): Buffer {
  21. return Buffer(
  22. count = (b1?.count ?: 0) + (b2?.count ?: 0),
  23. cnt = (b1?.cnt ?: 0) + (b2?.cnt ?: 0)
  24. )
  25. }
  26. }

其中 reduce是对输入的数据进行聚合(这里是累加)

outputEncoder和bufferEncoder是将输出和缓存的结果进行序列化

zero是对数据赋初值

merge是 多分区的数据进行合并时调用的方法 用于将缓存数据合并 (这里时Buffer类)

finish是输出最终结果

要十分注意在这些方法中的空安全处理(有时候kt的空安全挺烦人的) 不要轻易用非空断言

3. 注册UDAF函数:

使用functions.udaf()注册 要指明输出结果的类型

  1. sparkSession.udf().register("avgAge", functions.udaf(agg, Encoders.LONG()))
4. 执行SQL查询:

将读入的数据注册为视图 调用sparkSession的sql方法查询 并将结果在控制台打印输出

  1. sparkSession.read().json("data/user.txt").createOrReplaceTempView("user")
  2. sparkSession.sql("select avgAge(age) as avg_age from user").show()
5.释放资源:

调用stop过close方法释放资源

  1. sparkSession.stop()

4. 总结

在不适用spark kotlin api的情况下 用kotlin来写SparkSql基本是使用spark提供的Java api 因为kt与Scala不是完全兼容 所以要注意其中的一些高阶函数还有元组的使用和序列化等问题

标签: spark kotlin

本文转载自: https://blog.csdn.net/qq_42531534/article/details/141526505
版权归原作者 羡慕怪_21 所有, 如有侵权,请联系我们删除。

“在Kotlin中使用Spark SQL的UDF和UDAF函数”的评论:

还没有评论