0


PySpark中的StructStreaming的使用

使用pyspark编写StructStreaming的入门案例,如有雷同,纯属巧合,所有代码亲测可用。

一、SparkStreaming 的不足

1.基于微批,延迟高不能做到真正的实时

2.DStream基于RDD,不直接支持SQL

3.流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD)

4.不支持EventTime事件时间

5.数据的Exactly-Once(恰好一次语义)需要手动实现

二、StructuredStreaming 的介绍

Spark在2016年Spark2.0版本中发布了新的流计算的API:Structured streaming结构化流。Structured streaming是一个基于SparkSOL引擎的可扩展、容错的全新的流处理引擎。

structured Streaming统一了流、批的编程模型,可以让用户像编写批处理程序一样简单地编写高性能的流处理程序Structured streaming并不是对Spark Streaming的简单改进,而是吸取了在开发SparkSQL和Sparkstreaming过程中的经验教训,以及Spark社区和Databricks众多客户的反馈,重新开发的全新流式引擎,致力于为批处理和流处理提供统一的高性能API。

在Structured streaming这个新的引擎中,也实现了之前在SparkStreaming中没有的一些功能,比如Event Time(事件时间)的支持,Stream-Streamjoin(2.3.0),Continuous Processing毫秒级延迟(2.3.0)。同时也考虑了和Spark 其他组件更好的集成。

Structured Streaming Programming Guide - Spark 3.5.3 Documentation

三、编程模型

四、Source

Structured Streaming Programming Guide - Spark 3.5.3 Documentation

案例一:Socket

  1. import os
  2. from pyspark.sql import SparkSession
  3. from pyspark.sql.functions import explode
  4. from pyspark.sql.functions import split
  5. if __name__ == '__main__':
  6. # 配置环境
  7. os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
  8. # 配置Hadoop的路径,就是前面解压的那个路径
  9. os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
  10. # 配置base环境Python解析器的路径
  11. os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径
  12. os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
  13. os.environ['HADOOP_USER_NAME'] = 'root'
  14. spark = SparkSession \
  15. .builder \
  16. .appName("StructuredNetworkWordCount") \
  17. .getOrCreate()
  18. # Create DataFrame representing the stream of input lines from connection to localhost:9999
  19. lines = spark \
  20. .readStream \
  21. .format("socket") \
  22. .option("host", "bigdata01") \
  23. .option("port", 9999) \
  24. .load()
  25. # Split the lines into words
  26. words = lines.select(
  27. explode(
  28. split(lines.value, " ")
  29. ).alias("word")
  30. )
  31. # Generate running word count
  32. wordCounts = words.groupBy("word").count()
  33. # Start running the query that prints the running counts to the console
  34. query = wordCounts \
  35. .writeStream \
  36. .outputMode("complete") \
  37. .format("console") \
  38. .start()
  39. query.awaitTermination()
  40. spa
标签: ajax 前端 javascript

本文转载自: https://blog.csdn.net/wozhendeyumenle/article/details/143622284
版权归原作者 二进制_博客 所有, 如有侵权,请联系我们删除。

“PySpark中的StructStreaming的使用”的评论:

还没有评论