0


DStream转换操作

    Spark Streaming中对DStream的转换会转变成对RDD的转换操作,流程如下:

其中,lines表示转换操作前的DStream,words表示转换操作后生成的DStream。对lines做flatMap转换操作,也就是对它内部的所有RDD做flatMap转换操作。

接下来,列举DStream API 提供的与转换操作相关的方法。

在以上表中,列举了一些DStream API提供的与转换操作相关的方法。DStream API提供的与转换操作相关的方法和RDD API有些不同,不同之处在于RDD API 中没有提供transform()和updateStateByKey()两个方法。

1、transform()

(1)启动master,slave1,slave2三个节点的kafka服务。

bin/kafka-server-start.sh.config/server.properties

(2)在master节点下载nc服务并开启服务

下载命令:yum install nc

执行命令 nc-lk 9999启动服务器且监听Socket服务,并输入数据 I am learning Spark Streaming now.

(3)打开IDEA工具,创建一个名为spark03的Maven项目。

(4)配置pom.xml文件,引入Spark Streaming相关依赖和设置源代码的存储路径,具体内容如下:

<dependencies>
        <!--引入Scala编程依赖库-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>

        <!--引入Spark核心依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.0.2</version>
        </dependency>

        <!--引入sparkStreaming-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.0.2</version>
        </dependency>

    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
    </build>

(5)配置好pom.xml文件后,在/src/main和/src/test目录下分别创建scala目录,用来防止sourceDirectory和testDirectory标签提示错误。

(6)在spark03项目的/src/main/scala目录下创建一个名为itcast的包,接着在包下创建名为TransformTest的scala类,编写以下代码:

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}

object TransformTest {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val sparkConf : SparkConf = new SparkConf()
      .setAppName("TransformTest").setMaster("local[2]")

    //创建SparkContext对象
    val sc : SparkContext = new SparkContext(sparkConf)
    //设置日志级别
    sc.setLogLevel("WARN")

    //创建StreamingContext
    val ssc : StreamingContext = new StreamingContext(sc,Seconds(5))
    //连接socket服务
    val dstream :ReceiverInputDStream[String] =
      ssc.socketTextStream("192.168.196.101",9999)
    //使用RDD-to-RDD函数
    val words : DStream[String] = dstream.transform(rdd => rdd.flatMap(_.split(" ")))
    //打印输出
    words.print()
    //开启流式计算
    ssc.start()
    //用于保持程序一直运行
    ssc.awaitTermination()
  }
}

运行结果如下图所示:

2、updateStateByKey()

在spark03项目的/src/main/scala/itcast目录下创建一个名为updataStateByKeyTest的scala类,编写以下内容:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object UpdateStateByKeyTest {
  //newValues表示当前批次汇总成的(k,v)中的相同k的所有v
  //runningCount表示历史的所有相同key的value总和
  def updateFunction(newValues : Seq[Int],runningCount:Option[Int]):
  Option[Int] = {
    val newCount = runningCount.getOrElse(0) + newValues.sum
    Some(newCount)
  }

  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("UpdateStateByKeyTest").setMaster("local[2]")

    //创建SparkContext对象
    val sc: SparkContext = new SparkContext(sparkConf)
    //设置日志级别
    sc.setLogLevel("WARN")

    //创建StreamingContext
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))

    //配置检查点目录
    ssc.checkpoint("./")

    //连接socket服务
    val dstream: ReceiverInputDStream[String] =
      ssc.socketTextStream("192.168.196.101", 9999)
    //按空格切分每一行
    val wordAndOne : DStream[(String,Int)]= dstream.flatMap(_.split(" ")).map(word =>(word,1))

    //调用updateStateByKey操作
    var result : DStream[(String,Int)] = wordAndOne.updateStateByKey(updateFunction)
    //打印输出
    result.print()
    //开启流式计算
    ssc.start()
    //用于保持程序一直运行
    ssc.awaitTermination()

  }
}

先在IDEA中的updataStateByKeyTest运行代码,然后再到master节点不断输入单词,具体内容如下:

nc -lk 9999

hadoop spark itcast

spark itcast

运行结果如下:

标签: spark scala hadoop

本文转载自: https://blog.csdn.net/m0_59839948/article/details/125314402
版权归原作者 鄙人阿彬 所有, 如有侵权,请联系我们删除。

“DStream转换操作”的评论:

还没有评论