0


DStream窗口操作

在SparkStreaming中,为DStream提供窗口操作,即在 DStream流上,将一个可配置的长度设置为窗口,以一个可配置的速率向前移动窗口。根据窗口操作,对窗口内的数据进行计算,每次落在窗口内的RDD数据会被聚合起来计算,生成的RDD会作为WindowDStream的一个RDD。

DStream API提供的与窗口操作相关的方法:

1、window()

在spark03项目的/src/main/scala/itcast目录下创建一个名为WindowTest的scala类,编写以下内容:

  1. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  2. import org.apache.spark.streaming.{Seconds, StreamingContext}
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object WindowTest {
  5. def main(args: Array[String]): Unit = {
  6. //创建SparkConf对象
  7. val sparkConf : SparkConf = new SparkConf()
  8. .setAppName("WindowTest").setMaster("local[2]")
  9. //创建SparkContext对象
  10. val sc : SparkContext = new SparkContext(sparkConf)
  11. //设置日志级别
  12. sc.setLogLevel("WARN")
  13. //创建StreamingContext
  14. val ssc : StreamingContext = new StreamingContext(sc,Seconds(1))
  15. //连接socket服务
  16. val dstream: ReceiverInputDStream[String] =
  17. ssc.socketTextStream("192.168.196.101", 9999)
  18. //按空格切分每一行
  19. val words:DStream[String]= dstream.flatMap(_.split(""))
  20. //调用Window操作,需要两个参数
  21. val windowWords : DStream[String]= words.window(Seconds(3),Seconds(1))
  22. //打印输出
  23. windowWords.print()
  24. //开启流式计算
  25. ssc.start()
  26. //用于保持程序一直运行
  27. ssc.awaitTermination()
  28. }
  29. }

先在IDEA中的WindowTest运行代码,然后再到master节点每隔一秒输入数字,运行结果如下:

2、reduceByKeyAndWindow()

在spark03项目的/src/main/scala/itcast目录下创建一个名为ReduceByKeyAndWindowTest的scala类,编写以下内容:

  1. import org.apache.spark.streaming.{Seconds, StreamingContext}
  2. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object ReduceByKeyAndWindowTest {
  5. def main(args: Array[String]): Unit = {
  6. //创建SparkConf对象
  7. val sparkConf : SparkConf = new SparkConf()
  8. .setAppName("ReduceByKeyAndWindowTest").setMaster("local[2]")
  9. //创建SparkContext对象
  10. val sc : SparkContext = new SparkContext(sparkConf)
  11. //设置日志级别
  12. sc.setLogLevel("WARN")
  13. //创建StreamingContext
  14. val ssc : StreamingContext = new StreamingContext(sc,Seconds(1))
  15. //连接socket服务
  16. val dstream: ReceiverInputDStream[String] =
  17. ssc.socketTextStream("192.168.196.101", 9999)
  18. //按空格切分每一行
  19. val wordAnOne : DStream[(String,Int)]= dstream.flatMap(_.split(" ")).map(word =>(word,1))
  20. //调用ReduceByKeyAndWindowTest操作
  21. val windowWords:DStream[(String,Int)]= wordAnOne.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(3),Seconds(1))
  22. //打印输出
  23. windowWords.print()
  24. //开启流式计算
  25. ssc.start()
  26. //用于保持程序一直运行
  27. ssc.awaitTermination()
  28. }
  29. }

先在IDEA中的ReduceByKeyAndWindowTest运行代码,然后再到master节点每隔一秒输入字母,运行结果如下:


本文转载自: https://blog.csdn.net/m0_59839948/article/details/125341121
版权归原作者 鄙人阿彬 所有, 如有侵权,请联系我们删除。

“DStream窗口操作”的评论:

还没有评论