Spark Streaming中对DStream的转换会转变成对RDD的转换操作,流程如下:
其中,lines表示转换操作前的DStream,words表示转换操作后生成的DStream。对lines做flatMap转换操作,也就是对它内部的所有RDD做flatMap转换操作。
接下来,列举DStream API 提供的与转换操作相关的方法。
在以上表中,列举了一些DStream API提供的与转换操作相关的方法。DStream API提供的与转换操作相关的方法和RDD API有些不同,不同之处在于RDD API 中没有提供transform()和updateStateByKey()两个方法。
1、transform()
(1)启动master,slave1,slave2三个节点的kafka服务。
bin/kafka-server-start.sh.config/server.properties
(2)在master节点下载nc服务并开启服务
下载命令:yum install nc
执行命令 nc-lk 9999启动服务器且监听Socket服务,并输入数据 I am learning Spark Streaming now.
(3)打开IDEA工具,创建一个名为spark03的Maven项目。
(4)配置pom.xml文件,引入Spark Streaming相关依赖和设置源代码的存储路径,具体内容如下:
<dependencies>
<!--引入Scala编程依赖库-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<!--引入Spark核心依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<!--引入sparkStreaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.2</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
</build>
(5)配置好pom.xml文件后,在/src/main和/src/test目录下分别创建scala目录,用来防止sourceDirectory和testDirectory标签提示错误。
(6)在spark03项目的/src/main/scala目录下创建一个名为itcast的包,接着在包下创建名为TransformTest的scala类,编写以下代码:
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
object TransformTest {
def main(args: Array[String]): Unit = {
//创建SparkConf对象
val sparkConf : SparkConf = new SparkConf()
.setAppName("TransformTest").setMaster("local[2]")
//创建SparkContext对象
val sc : SparkContext = new SparkContext(sparkConf)
//设置日志级别
sc.setLogLevel("WARN")
//创建StreamingContext
val ssc : StreamingContext = new StreamingContext(sc,Seconds(5))
//连接socket服务
val dstream :ReceiverInputDStream[String] =
ssc.socketTextStream("192.168.196.101",9999)
//使用RDD-to-RDD函数
val words : DStream[String] = dstream.transform(rdd => rdd.flatMap(_.split(" ")))
//打印输出
words.print()
//开启流式计算
ssc.start()
//用于保持程序一直运行
ssc.awaitTermination()
}
}
运行结果如下图所示:
2、updateStateByKey()
在spark03项目的/src/main/scala/itcast目录下创建一个名为updataStateByKeyTest的scala类,编写以下内容:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
object UpdateStateByKeyTest {
//newValues表示当前批次汇总成的(k,v)中的相同k的所有v
//runningCount表示历史的所有相同key的value总和
def updateFunction(newValues : Seq[Int],runningCount:Option[Int]):
Option[Int] = {
val newCount = runningCount.getOrElse(0) + newValues.sum
Some(newCount)
}
def main(args: Array[String]): Unit = {
//创建SparkConf对象
val sparkConf: SparkConf = new SparkConf()
.setAppName("UpdateStateByKeyTest").setMaster("local[2]")
//创建SparkContext对象
val sc: SparkContext = new SparkContext(sparkConf)
//设置日志级别
sc.setLogLevel("WARN")
//创建StreamingContext
val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
//配置检查点目录
ssc.checkpoint("./")
//连接socket服务
val dstream: ReceiverInputDStream[String] =
ssc.socketTextStream("192.168.196.101", 9999)
//按空格切分每一行
val wordAndOne : DStream[(String,Int)]= dstream.flatMap(_.split(" ")).map(word =>(word,1))
//调用updateStateByKey操作
var result : DStream[(String,Int)] = wordAndOne.updateStateByKey(updateFunction)
//打印输出
result.print()
//开启流式计算
ssc.start()
//用于保持程序一直运行
ssc.awaitTermination()
}
}
先在IDEA中的updataStateByKeyTest运行代码,然后再到master节点不断输入单词,具体内容如下:
nc -lk 9999
hadoop spark itcast
spark itcast
运行结果如下:
版权归原作者 鄙人阿彬 所有, 如有侵权,请联系我们删除。