0


DStream转换操作

  1. Spark Streaming中对DStream的转换会转变成对RDD的转换操作,流程如下:

其中,lines表示转换操作前的DStream,words表示转换操作后生成的DStream。对lines做flatMap转换操作,也就是对它内部的所有RDD做flatMap转换操作。

接下来,列举DStream API 提供的与转换操作相关的方法。

在以上表中,列举了一些DStream API提供的与转换操作相关的方法。DStream API提供的与转换操作相关的方法和RDD API有些不同,不同之处在于RDD API 中没有提供transform()和updateStateByKey()两个方法。

1、transform()

(1)启动master,slave1,slave2三个节点的kafka服务。

bin/kafka-server-start.sh.config/server.properties

(2)在master节点下载nc服务并开启服务

下载命令:yum install nc

执行命令 nc-lk 9999启动服务器且监听Socket服务,并输入数据 I am learning Spark Streaming now.

(3)打开IDEA工具,创建一个名为spark03的Maven项目。

(4)配置pom.xml文件,引入Spark Streaming相关依赖和设置源代码的存储路径,具体内容如下:

  1. <dependencies>
  2. <!--引入Scala编程依赖库-->
  3. <dependency>
  4. <groupId>org.scala-lang</groupId>
  5. <artifactId>scala-library</artifactId>
  6. <version>2.11.8</version>
  7. </dependency>
  8. <!--引入Spark核心依赖-->
  9. <dependency>
  10. <groupId>org.apache.spark</groupId>
  11. <artifactId>spark-core_2.11</artifactId>
  12. <version>2.0.2</version>
  13. </dependency>
  14. <!--引入sparkStreaming-->
  15. <dependency>
  16. <groupId>org.apache.spark</groupId>
  17. <artifactId>spark-streaming_2.11</artifactId>
  18. <version>2.0.2</version>
  19. </dependency>
  20. </dependencies>
  21. <build>
  22. <sourceDirectory>src/main/scala</sourceDirectory>
  23. <testSourceDirectory>src/test/scala</testSourceDirectory>
  24. </build>

(5)配置好pom.xml文件后,在/src/main和/src/test目录下分别创建scala目录,用来防止sourceDirectory和testDirectory标签提示错误。

(6)在spark03项目的/src/main/scala目录下创建一个名为itcast的包,接着在包下创建名为TransformTest的scala类,编写以下代码:

  1. import org.apache.spark.streaming.{Seconds, StreamingContext}
  2. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object TransformTest {
  5. def main(args: Array[String]): Unit = {
  6. //创建SparkConf对象
  7. val sparkConf : SparkConf = new SparkConf()
  8. .setAppName("TransformTest").setMaster("local[2]")
  9. //创建SparkContext对象
  10. val sc : SparkContext = new SparkContext(sparkConf)
  11. //设置日志级别
  12. sc.setLogLevel("WARN")
  13. //创建StreamingContext
  14. val ssc : StreamingContext = new StreamingContext(sc,Seconds(5))
  15. //连接socket服务
  16. val dstream :ReceiverInputDStream[String] =
  17. ssc.socketTextStream("192.168.196.101",9999)
  18. //使用RDD-to-RDD函数
  19. val words : DStream[String] = dstream.transform(rdd => rdd.flatMap(_.split(" ")))
  20. //打印输出
  21. words.print()
  22. //开启流式计算
  23. ssc.start()
  24. //用于保持程序一直运行
  25. ssc.awaitTermination()
  26. }
  27. }

运行结果如下图所示:

2、updateStateByKey()

在spark03项目的/src/main/scala/itcast目录下创建一个名为updataStateByKeyTest的scala类,编写以下内容:

  1. import org.apache.spark.{SparkConf, SparkContext}
  2. import org.apache.spark.streaming.{Seconds, StreamingContext}
  3. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  4. object UpdateStateByKeyTest {
  5. //newValues表示当前批次汇总成的(k,v)中的相同k的所有v
  6. //runningCount表示历史的所有相同key的value总和
  7. def updateFunction(newValues : Seq[Int],runningCount:Option[Int]):
  8. Option[Int] = {
  9. val newCount = runningCount.getOrElse(0) + newValues.sum
  10. Some(newCount)
  11. }
  12. def main(args: Array[String]): Unit = {
  13. //创建SparkConf对象
  14. val sparkConf: SparkConf = new SparkConf()
  15. .setAppName("UpdateStateByKeyTest").setMaster("local[2]")
  16. //创建SparkContext对象
  17. val sc: SparkContext = new SparkContext(sparkConf)
  18. //设置日志级别
  19. sc.setLogLevel("WARN")
  20. //创建StreamingContext
  21. val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
  22. //配置检查点目录
  23. ssc.checkpoint("./")
  24. //连接socket服务
  25. val dstream: ReceiverInputDStream[String] =
  26. ssc.socketTextStream("192.168.196.101", 9999)
  27. //按空格切分每一行
  28. val wordAndOne : DStream[(String,Int)]= dstream.flatMap(_.split(" ")).map(word =>(word,1))
  29. //调用updateStateByKey操作
  30. var result : DStream[(String,Int)] = wordAndOne.updateStateByKey(updateFunction)
  31. //打印输出
  32. result.print()
  33. //开启流式计算
  34. ssc.start()
  35. //用于保持程序一直运行
  36. ssc.awaitTermination()
  37. }
  38. }

先在IDEA中的updataStateByKeyTest运行代码,然后再到master节点不断输入单词,具体内容如下:

nc -lk 9999

hadoop spark itcast

spark itcast

运行结果如下:

标签: spark scala hadoop

本文转载自: https://blog.csdn.net/m0_59839948/article/details/125314402
版权归原作者 鄙人阿彬 所有, 如有侵权,请联系我们删除。

“DStream转换操作”的评论:

还没有评论