0


大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式

喜大普奔!破百了!

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Spark Streaming 基础数据源
  • 文件流、Socket流、RDD队列流
  • 引入依赖、Java编写多种流进行测试

在这里插入图片描述

DStream 转换

DStream上的操作与RDD类似,分为Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的方法,如:

  • updateStateByKey
  • transform
  • window相关操作

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

map(func)

对 DStream 中的每个元素应用 func 函数,并返回一个新的 DStream。
例如,将每个记录转换为其长度。
示例:val lengths = lines.map(line => line.length)

flatMap(func)

对 DStream 中的每个元素应用 func 函数,并将结果展平(即将集合的集合展开)。
例如,将每一行文本拆分为单词。
示例:val words = lines.flatMap(line => line.split(" "))

filter(func)

对 DStream 中的每个元素应用 func 函数,并保留返回值为 true 的元素。
例如,过滤掉长度小于 5 的单词。
示例:val filteredWords = words.filter(word => word.length > 5)

reduceByKey(func)

对键值对 DStream 进行聚合操作,对具有相同键的元素应用 func 函数。
例如,计算每个单词的总数。
示例:val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

groupByKey()

对键值对 DStream 中的每个键进行分组,并将具有相同键的值聚合到一个列表中。
示例:val grouped = pairs.groupByKey()

count()

统计 DStream 中每个 RDD 的元素个数。
示例:val count = words.count()

countByValue()

统计 DStream 中每个 RDD 中每个值的出现次数。
示例:val valueCounts = words.countByValue()

union(otherDStream)

将两个 DStream 合并为一个新的 DStream,包含两个 DStream 中的所有元素。
示例:val mergedStream = stream1.union(stream2)

join(otherDStream)

对两个键值对 DStream 进行连接操作,类似 SQL 中的 JOIN 操作。
示例:val joinedStream = stream1.join(stream2)

备注:

  • 在DStream与RDD上的转换操作非常类似(无状态操作)
  • DStream有自己特殊的操作(窗口操作、追踪状态变化操作)
  • 在DStream上的转换操作比RDD上的转换操作少

DStream 的转换操作可以分为 无状态(stateless)和 有状态(stateful)两种:

  • 无状态转换操作,每个批次的处理不依赖与之前批次的数据,常见的RDD转化操作,例如:map、Filter、reduceByKey等
  • 有状态转换操作,需要使用之前批次的数据或者是中间结果来计算当前批次的数据,有状态转换操作包括:基于滑动窗口的转换操作或追踪状态变化的转化操作

无状态转换

无状态转换操作就是把简单的RDD转换操作应用到每个批次上,也就是转换DStream中的每一个RDD。
常见的无状态转换包括:

  • map
  • flatMap
  • repartition
  • reduceByKey
  • groupByKey

重要的转换操作:transform,通过对源DStream的每个RDD应用RDD-To-RDD函数,创建一个新的DStream,支持在新的DStream中任何RDD操作。
这是一个功能强大的函数,它可以允许开发者直接操作其内部的RDD,也就是说开发者,可以任意提供一个RDDToRDD的函数,这个函数在数据流每个批次中都被调用,生成一个新的流。

案例1 黑名单过滤

假设:arr1为黑名单数据(自定义),true表示数据生效,需要被过滤掉;false表示数据
未生效
val arr1 = Array(("spark", true),("scala", false))
假设:流式数据格式为"time word",需要根据黑名单中的数据对流式数据执行过滤操
作。如"2 spark"要被过滤掉
1 hadoop
2 spark
3 scala
4java5 hive
结果:"2 spark" 被过滤

方案1 外连接实现

packageicu.wzkimportorg.apache.spark.SparkConf
importorg.apache.spark.rdd.RDD
importorg.apache.spark.streaming.dstream.ConstantInputDStream
importorg.apache.spark.streaming.{Seconds, StreamingContext}object BlackListFilter1 {def main(args: Array[String]):Unit={val conf =new SparkConf().setAppName("BlackListFilter1").setMaster("local[*]")val ssc =new StreamingContext(conf, Seconds(10))// 黑名单val blackList = Array(("spark",true),("scala",true))val blackListRDD = ssc.sparkContext.makeRDD(blackList)// 测试数据val strArray: Array[String]="spark java scala hadoop kafka hive hbase zookeeper".split("\\s+").zipWithIndex
      .map {case(word, index)=>s"$index$word"}val rdd = ssc.sparkContext.makeRDD(strArray)val clickStream =new ConstantInputDStream(ssc, rdd)// 流式数据的处理val clickStreamFormatted = clickStream
      .map(value =>(value.split(" ")(1), value))
    clickStreamFormatted.transform(clickRDD =>{val joinedBlockListRDD: RDD[(String,(String, Option[Boolean]))]= clickRDD.leftOuterJoin(blackListRDD)
      joinedBlockListRDD.filter {case(word,(streamingLine, flag))=>{if(flag.getOrElse(false)){false}else{true}}}.map {case(word,(streamingLine, flag))=> streamingLine
      }}).print()// 启动
    ssc.start()
    ssc.awaitTermination()}}

方案1 运行结果

-------------------------------------------
Time: 1721618670000 ms
-------------------------------------------
5 hive
6 hbase
1java7 zookeeper
3 hadoop
4 kafka

... 下一批

对应的结果如下图所示:
在这里插入图片描述

方案2 SQL实现

packageicu.wzkimportorg.apache.spark.SparkConf
importorg.apache.spark.sql.{DataFrame, SparkSession}importorg.apache.spark.streaming.dstream.ConstantInputDStream
importorg.apache.spark.streaming.{Seconds, StreamingContext}object BlackListFilter2 {def main(args: Array[String]):Unit={val conf =new SparkConf().setAppName("BlackListFilter2").setMaster("local[*]")val ssc =new StreamingContext(conf, Seconds(10))
    ssc.sparkContext.setLogLevel("WARN")// 黑名单val blackList = Array(("spark",true),("scala",true))val blackListRDD = ssc.sparkContext.makeRDD(blackList)// 生成测试 DStreamval strArray: Array[String]="spark java scala hadoop kafka hive hbase zookeeper".split("\\s+").zipWithIndex
      .map {case(word, index)=>s"$index$word"}val rdd = ssc.sparkContext.makeRDD(strArray)val clickStream =new ConstantInputDStream(ssc, rdd)// 流式数据的处理val clickStreamFormatted = clickStream
      .map(value =>(value.split(" ")(1), value))
    clickStreamFormatted.transform {
      clickRDD =>val spark = SparkSession
          .builder().config(rdd.sparkContext.getConf).getOrCreate()importspark.implicits._
        val clickDF: DataFrame = clickRDD.toDF("word","line")val blackDF: DataFrame = blackListRDD.toDF("word","flag")
        clickDF.join(blackDF, Seq("word"),"left").filter("flag is null or flag == false").select("line").rdd
    }.print()

    ssc.start()
    ssc.awaitTermination()}}

方案2 SQL运行结果

-------------------------------------------
Time: 1721619900000 ms
-------------------------------------------
[6 hbase][4 kafka][7 zookeeper][1 java][3 hadoop][5 hive]

运行结果截图如下图所示:
在这里插入图片描述

方案3 直接过滤

packageicu.wzkimportorg.apache.spark.SparkConf
importorg.apache.spark.broadcast.Broadcast
importorg.apache.spark.streaming.dstream.ConstantInputDStream
importorg.apache.spark.streaming.{Seconds, StreamingContext}object BlackListFilter3 {def main(args: Array[String]):Unit={val conf =new SparkConf().setAppName("BlackListFilter3").setMaster("local[*]")val ssc =new StreamingContext(conf, Seconds(10))
    ssc.sparkContext.setLogLevel("WARN")// 黑名单val blackList = Array(("spark",true),("scala",true))val blackListBC: Broadcast[Array[String]]= ssc
      .sparkContext
      .broadcast(blackList.filter(_._2).map(_._1))// 生成测试DStreamval strArray: Array[String]="spark java scala hadoop kafka hive hbase zookeeper".split("\\s+").zipWithIndex
      .map {case(word, index)=>s"$index$word"}val rdd = ssc.sparkContext.makeRDD(strArray)val clickStream =new ConstantInputDStream(ssc, rdd)// 流式数据的处理
    clickStream.map(value =>(value.split(" ")(1), value)).filter {case(word, _)=>!blackListBC.value.contains(word)}.map(_._2).print()// 启动
    ssc.start()
    ssc.awaitTermination()}}

方案3 直接过滤运行结果

-------------------------------------------
Time: 1721627600000 ms
-------------------------------------------
1java3 hadoop
4 kafka
5 hive
6 hbase
7 zookeeper

... 下一批

运行结果如下图所示:
在这里插入图片描述


本文转载自: https://blog.csdn.net/w776341482/article/details/141492146
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。

“大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式”的评论:

还没有评论