0


Structured_Streaming和Kafka整合

结构化编程模型

输出终端/位置

默认情况下,Spark的结构化流支持多种输出方案:

  1. 1- console sink: 将结果数据输出到控制台。主要是用在测试中,并且支持3种输出模式
  2. 2- File sink: 输出到文件。将结果数据输出到某个目录下,形成文件数据。只支持append模式
  3. 3- foreach sink foreachBatch sink: 将数据进行遍历处理。遍历后输出到哪里,取决于自定义函数。并且支持3种输出模式
  4. 4- memory sink: 将结果数据输出到内存中。主要目的是进行再次的迭代处理。数据大小不能过大。支持append模式和complete模式
  5. 5- Kafka sink: 将结果数据输出到Kafka中。类似于Kafka中的生产者角色。并且支持3种输出模式

File Sink

  1. import os
  2. from pyspark.sql import SparkSession
  3. import pyspark.sql.functions as F
  4. # 绑定指定的Python解释器
  5. os.environ['SPARK_HOME']='/export/server/spark'
  6. os.environ['PYSPARK_PYTHON']='/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON']='/root/anaconda3/bin/python3'if __name__ =='__main__':# 1- 创建SparkSession对象
  8. spark = SparkSession.builder\
  9. .config("spark.sql.shuffle.partitions",1)\
  10. .appName('file_sink')\
  11. .master('local[1]')\
  12. .getOrCreate()# 输出到文件必须要设置checkpointLocation检查点路径
  13. spark.conf.set("spark.sql.streaming.checkpointLocation","hdfs://node1:8020/chk")# 2- 数据输入
  14. init_df = spark.readStream\
  15. .format("socket")\
  16. .option("host","192.168.88.161")\
  17. .option("port","55555")\
  18. .load()# 3- 数据处理# 4- 数据输出# 5- 启动流式任务"""
  19. File Sink总结:
  20. 1- 输出到文件必须要设置checkpointLocation检查点路径
  21. 2- 因为结构化流底层是微批处理,如果不手动指定处理间隔,程序会尽可能缩短两个批处理间的间隔。那么会导致小文件问题的产生
  22. 3- 我们可以使用触发器trigger来指定批处理的时间间隔,用来减少小文件的产生
  23. """
  24. init_df.writeStream\
  25. .format("csv")\
  26. .outputMode("append")\
  27. .option("sep",",")\
  28. .option("header","True")\
  29. .option("encoding","UTF-8")\
  30. .trigger(processingTime="20 seconds")\
  31. .start("file:///export/data/child")\
  32. .awaitTermination()
  1. file sink总结:
  2. 1- 要设置检查点数据存放路径。否则会报如下错误
  3. AnalysisException: checkpointLocation must be specified either through
  4. 2- 因为结构化流底层是微批处理,如果不手动指定处理间隔,程序会尽可能缩短两个批处理间的间隔。那么会导致小文件问题的产生
  5. 3- 可以通过触发器trigger解决小文件的问题。可以通过触发器来调整每一批次产生间隔的时间
  6. 4- 支持输出到本地文件系统和HDFS文件系统

foreach sink
允许对输出的数据进行任意的处理操作,具体如何处理由用户自定义函数决定。对输出的数据一个个进行处理操作。
使用方式主要有二种
在这里插入图片描述
方式一:

  1. import os
  2. from pyspark.sql import SparkSession
  3. import pyspark.sql.functions as F
  4. # 绑定指定的Python解释器
  5. os.environ['SPARK_HOME']='/export/server/spark'
  6. os.environ['PYSPARK_PYTHON']='/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON']='/root/anaconda3/bin/python3'if __name__ =='__main__':# 1- 创建SparkSession对象
  8. spark = SparkSession.builder\
  9. .config("spark.sql.shuffle.partitions",1)\
  10. .appName('foreach_sink')\
  11. .master('local[*]')\
  12. .getOrCreate()# 2- 数据输入
  13. init_df = spark.readStream\
  14. .format("socket")\
  15. .option("host","192.168.88.161")\
  16. .option("port","55555")\
  17. .load()# 3- 数据处理# 4- 数据输出# 5- 启动流式任务"""
  18. 每输入一条数据都会调用foreach中的函数。
  19. 取Row中某个字段的值:
  20. 方式一 row.字段名
  21. 方式二 row['字段名']
  22. """defmy_foreach_func(row):# 打开数据库连接# 存储到数据库中print(row,row.value)print(row['value'])# 关闭数据库连接
  23. init_df.writeStream.foreach(my_foreach_func).outputMode("append").start().awaitTermination()

方式二:这种方式的适用场景是需要和资源打交道的情况(例如:连接和关闭数据库、打开关闭文件等)。通过open和close来处理资源,通过process来对数据进行自定义的处理

  1. import os
  2. from pyspark.sql import SparkSession
  3. import pyspark.sql.functions as F
  4. # 绑定指定的Python解释器
  5. os.environ['SPARK_HOME']='/export/server/spark'
  6. os.environ['PYSPARK_PYTHON']='/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON']='/root/anaconda3/bin/python3'if __name__ =='__main__':# 1- 创建SparkSession对象
  8. spark = SparkSession.builder\
  9. .config("spark.sql.shuffle.partitions",1)\
  10. .appName('foreach_sink')\
  11. .master('local[*]')\
  12. .getOrCreate()# 2- 数据输入
  13. init_df = spark.readStream\
  14. .format("socket")\
  15. .option("host","192.168.88.161")\
  16. .option("port","55555")\
  17. .load()# 3- 数据处理# 4- 数据输出# 5- 启动流式任务# 自定义类"""
  18. foreach sink方式二
  19. 1- 必须要创建一个自定义类,类中必须要有3个方法:open、process、close,方法名称、形参不能改变
  20. 2- open中代码执行完成以后,返回值类型是bool类型。如果返回False,那么不会执行process方法;只有返回True的时候才会执行process方法
  21. 3- 一般open用来对资源进行初始化;process使用相关资源对数据进行自定义的处理;close用来对资源进行释放
  22. 4- 一般会和trigger一起配合使用,用来减少消耗资源的操作
  23. """classMyForeachFunc:defopen(self,partition_id, epoch_id):print(f"partition_id={partition_id},epoch_id={epoch_id}")# 打开数据库连接print("打开数据库连接")returnTruedefprocess(self,row):# 存储到数据库中print(row, row.value)defclose(self, error):# 关闭数据库连接print("关闭数据库连接")
  24. init_df.writeStream\
  25. .foreach(MyForeachFunc())\
  26. .trigger(processingTime="20 seconds")\
  27. .outputMode("append")\
  28. .start()\
  29. .awaitTermination()
  1. 说明:
  2. open: 在一个批次中只会调用一次。返回值是bool类型,当返回True的时候,process方法才会被调用
  3. process: 会被调用多次,该批次内有多少行数据,就会被调用多少次
  4. close: 在一个批次中只会调用一次。用来关闭在open打开的资源
  5. 1- 必须要创建一个自定义类,类中必须要有3个方法:openprocessclose,方法名称、形参不能改变
  6. 2- open中代码执行完成以后,返回值类型是bool类型。如果返回False,那么不会执行process方法;只有返回True的时候才会执行process方法
  7. 3- 一般open用来对资源进行初始化;process使用相关资源对数据进行自定义的处理;close用来对资源进行释放
  8. 4- 一般会和trigger一起配合使用,用来减少消耗资源的操作

foreachBatch Sink
在这里插入图片描述

  1. import os
  2. from pyspark.sql import SparkSession
  3. import pyspark.sql.functions as F
  4. # 绑定指定的Python解释器
  5. os.environ['SPARK_HOME']='/export/server/spark'
  6. os.environ['PYSPARK_PYTHON']='/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON']='/root/anaconda3/bin/python3'if __name__ =='__main__':# 1- 创建SparkSession对象
  8. spark = SparkSession.builder\
  9. .config("spark.sql.shuffle.partitions",1)\
  10. .appName('foreach_sink')\
  11. .master('local[*]')\
  12. .getOrCreate()# 2- 数据输入
  13. init_df = spark.readStream\
  14. .format("socket")\
  15. .option("host","192.168.88.161")\
  16. .option("port","55555")\
  17. .load()# 3- 数据处理# 4- 数据输出# 5- 启动流式任务"""
  18. batch_df:有界的DataFrame,可以调用show()方法
  19. batch_id:批次ID
  20. """defmy_foreach_batch_func(batch_df, batch_id):
  21. batch_df.show()print(batch_id)
  22. init_df.writeStream.foreachBatch(my_foreach_batch_func).outputMode("append").start().awaitTermination()
  1. 说明: process_fun(batch_df, batch_id)
  2. batch_df: 该批次中的数据形成的有界DataFrame
  3. batch_id: 批次的编号

设置触发器Trigger

触发器Trigger:决定多久执行一次操作并且输出结果。也就是在结构化流中,处理完一批数据以后,等待一会,再处理下一批数据

主要提供如下几种触发器:

  • 1- 默认方案:也就是不使用触发器的情况。如果没有明确指定,那么结构化流会自动进行决策每一个批次的大小。在运行过程中,会尽可能让每一个批次间的间隔时间变得更短(常用)
  1. result_df.writeStream.foreachBatch(func)\
  2. .outputMode('append')\
  3. .start()\
  4. .awaitTermination()
  • 2- 配置固定的时间间隔:在结构化流运行的过程中,当一批数据处理完以后,下一批数据需要等待一定的时间间隔才会进行处理(常用,推荐使用)
  1. result_df.writeStream.foreachBatch(func)\
  2. .outputMode('append')\
  3. .trigger(processingTime='5 seconds')\
  4. .start()\
  5. .awaitTermination()
  6. 情形说明:
  7. 1- 上一批次的数据在时间间隔内处理完成了,那么会等待我们配置触发器固定的时间间隔结束,才会开始处理下一批数据
  8. 2- 上一批次的数据在固定时间间隔结束的时候才处理完成,那么下一批次会立即被处理,不会等待
  9. 3- 上一批次的数据在固定时间间隔内没有处理完成,那么下一批次会等待上一批次处理完成以后立即开始处理,不会等待
  • 3- 仅此一次:在运行的过程中,程序只需要执行一次,然后就退出。这种方式适用于进行初始化操作,以及关闭资源等
  1. result_df.writeStream.foreachBatch(func)\
  2. .outputMode('append')\
  3. .trigger(once=True)\
  4. .start()\
  5. .awaitTermination()

Checkpoint检查点目录设置

设置检查点,目的是为了提供容错性。当程序出现失败了,可以从检查点的位置,直接恢复处理即可。避免出现重复处理的问题

检查点目录主要包含以下几个目录位置:
在这里插入图片描述

  1. 1-偏移量offsets: 记录每个批次中的偏移量。为了保证给定的批次始终包含相同的数据。在处理数据之前会将offset信息写入到该目录
  2. 2-提交记录commits: 记录已经处理完成的批次。重启任务的时候会检查完成的批次和offsets目录中批次的记录进行对比。确定接下来要处理的批次
  3. 3-元数据文件metadata: 和整个查询关联的元数据信息,目前只保留当前的job id
  4. 4-数据源sources: 是数据源(Source)各个批次的读取的详情
  5. 5-数据接收端sinks: 是数据接收端各个批次的写出的详情
  6. 6-状态state: 当有状态操作的时候,例如:累加、聚合、去重等操作场景,这个目录会用来记录这些状态数据。根据配置周期性的生成。snapshot文件用于记录状态

如何设置检查点:

  1. 1- SparkSession.conf.set("spark.sql.streaming.checkpointLocation", "检查点路径")
  2. 2- option("checkpointLocation", "检查点路径")
  3. 推荐: 检查点路径支持本地和HDFS。推荐使用HDFS路径

Spark和Kafka整合

从kafka中读取数据

spark和kafka集成官网文档:
https://spark.apache.org/docs/3.1.2/structured-streaming-kafka-integration.html

流式处理

官方给出方案:

  1. # 订阅Kafka的一个Topic,从最新的消息数据开始消费
  2. df = spark \
  3. .readStream \
  4. .format("kafka") \
  5. .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  6. .option("subscribe", "topic1") \
  7. .load()
  8. df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  9. # 订阅Kafka的多个Topic,多个Topic间使用英文逗号进行分隔。从最新的消息数据开始消费
  10. df = spark \
  11. .readStream \
  12. .format("kafka") \
  13. .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  14. .option("subscribe", "topic1,topic2") \
  15. .load()
  16. df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  17. # 订阅一个Topic,并且指定header信息
  18. df = spark \
  19. .readStream \
  20. .format("kafka") \
  21. .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  22. .option("subscribe", "topic1") \
  23. .option("includeHeaders", "true") \
  24. .load()
  25. df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
  26. # 订阅符合规则的Topic,从最新的数据开始消费
  27. df = spark \
  28. .readStream \
  29. .format("kafka") \
  30. .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  31. .option("subscribePattern", "topic.*") \
  32. .load()
  33. df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

从某一个Topic中读取消息数据:

  1. import os
  2. from pyspark.sql import SparkSession
  3. import pyspark.sql.functions as F
  4. # 绑定指定的Python解释器
  5. os.environ['SPARK_HOME']='/export/server/spark'
  6. os.environ['PYSPARK_PYTHON']='/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON']='/root/anaconda3/bin/python3'if __name__ =='__main__':# 1- 创建SparkSession对象
  8. spark = SparkSession.builder\
  9. .config("spark.sql.shuffle.partitions",1)\
  10. .appName('ss_read_kafka_1_topic')\
  11. .master('local[*]')\
  12. .getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费
  13. init_df = spark.readStream\
  14. .format("kafka")\
  15. .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
  16. .option("subscribe","test01")\
  17. .load()# 3- 数据处理
  18. result_df1 = init_df.select(F.expr("cast(value as string) as value"))# selectExpr = select + F.expr
  19. result_df2 = init_df.selectExpr("cast(value as string) as value")
  20. result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))# 4- 数据输出# 5- 启动流式任务"""
  21. 如果有多个输出,那么只能在最后一个start的后面写awaitTermination()
  22. """
  23. result_df1.writeStream.queryName("result_df1").format("console").outputMode("append").start()
  24. result_df2.writeStream.queryName("result_df2").format("console").outputMode("append").start()
  25. result_df3.writeStream.queryName("result_df3").format("console").outputMode("append").start().awaitTermination()

按规则订阅Topic:

  1. import os
  2. from pyspark.sql import SparkSession
  3. import pyspark.sql.functions as F
  4. # 绑定指定的Python解释器
  5. os.environ['SPARK_HOME']='/export/server/spark'
  6. os.environ['PYSPARK_PYTHON']='/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON']='/root/anaconda3/bin/python3'if __name__ =='__main__':# 1- 创建SparkSession对象
  8. spark = SparkSession.builder\
  9. .config("spark.sql.shuffle.partitions",1)\
  10. .appName('ss_read_kafka_multi_topic')\
  11. .master('local[*]')\
  12. .getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费
  13. init_df = spark.readStream\
  14. .format("kafka")\
  15. .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
  16. .option("subscribePattern","test.*")\
  17. .load()# 3- 数据处理
  18. result_df1 = init_df.select("topic",F.expr("cast(value as string) as value"))# selectExpr = select + F.expr
  19. result_df2 = init_df.selectExpr("topic","cast(value as string) as value")
  20. result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))# 4- 数据输出# 5- 启动流式任务"""
  21. 如果有多个输出,那么只能在最后一个start的后面写awaitTermination()
  22. """
  23. result_df1.writeStream.queryName("result_df1").format("console").outputMode("append").start()
  24. result_df2.writeStream.queryName("result_df2").format("console").outputMode("append").start()
  25. result_df3.writeStream.queryName("result_df3").format("console").outputMode("append").start().awaitTermination()

对接kafka后,返回的结果数据内容:

  1. key: 发送数据的key值。如果没有,就为null
  2. value: 最重要的字段。发送数据的value值,也就是消息内容。如果没有,就为null
  3. topic: 表示消息是从哪个Topic中消费出来
  4. partition: 分区编号。表示消费到的该条数据来源于Topic的哪个分区
  5. offset: 表示消息偏移量
  6. timestamp: 接收的时间戳
  7. timestampType: 时间戳类型(无意义)

类型的说明:
列名类型keybinaryvaluebinarytopicstringpartitionintoffsetlongtimestamptimestamptimestampTypeintheaders (optional)array

批处理

官方给出方案:

  1. # 订阅一个Topic主题, 默认从最早到最晚的偏移量范围
  2. df = spark \
  3. .read \
  4. .format("kafka") \
  5. .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  6. .option("subscribe", "topic1") \
  7. .load()
  8. df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  9. # 批处理订阅Kafka的多个Topic数据。并且可以通过startingOffsets和endingOffsets指定要消费的消息偏移
  10. 量(offset)范围。"topic1":{"0":23,"1":-2} 含义是:topic1"0":23从分区编号为0的分区的
  11. offset=23地方开始消费,"1":-2 从分区编号为1的分区的最开始的地方开始消费
  12. df = spark \
  13. .read \
  14. .format("kafka") \
  15. .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  16. .option("subscribe", "topic1,topic2") \
  17. .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
  18. .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
  19. .load()
  20. df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  21. # 通过正则匹配多个Topic, 默认从最早到最晚的偏移量范围
  22. df = spark \
  23. .read \
  24. .format("kafka") \
  25. .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  26. .option("subscribePattern", "topic.*") \
  27. .option("startingOffsets", "earliest") \
  28. .option("endingOffsets", "latest") \
  29. .load()
  30. df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

订阅一个Topic:

  1. import os
  2. from pyspark.sql import SparkSession
  3. import pyspark.sql.functions as F
  4. # 绑定指定的Python解释器
  5. os.environ['SPARK_HOME']='/export/server/spark'
  6. os.environ['PYSPARK_PYTHON']='/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON']='/root/anaconda3/bin/python3'if __name__ =='__main__':# 1- 创建SparkSession对象
  8. spark = SparkSession.builder\
  9. .config("spark.sql.shuffle.partitions",1)\
  10. .appName('sparksql_read_kafka_1_topic')\
  11. .master('local[*]')\
  12. .getOrCreate()# 2- 数据输入# 默认从Topic开头一直消费到结尾
  13. init_df = spark.read\
  14. .format("kafka")\
  15. .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
  16. .option("subscribe","test02")\
  17. .load()# 3- 数据处理
  18. result_df1 = init_df.select(F.expr("cast(value as string) as value"))# selectExpr = select + F.expr
  19. result_df2 = init_df.selectExpr("cast(value as string) as value")
  20. result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))# 4- 数据输出print("result_df1")
  21. result_df1.show()print("result_df2")
  22. result_df2.show()print("result_df3")
  23. result_df3.show()# 5- 释放资源
  24. spark.stop()

指定startingOffsets参数:

  1. import os
  2. from pyspark.sql import SparkSession
  3. import pyspark.sql.functions as F
  4. # 绑定指定的Python解释器
  5. os.environ['SPARK_HOME']='/export/server/spark'
  6. os.environ['PYSPARK_PYTHON']='/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON']='/root/anaconda3/bin/python3'if __name__ =='__main__':# 1- 创建SparkSession对象
  8. spark = SparkSession.builder\
  9. .config("spark.sql.shuffle.partitions",1)\
  10. .appName('sparksql_read_kafka_multi_topic')\
  11. .master('local[*]')\
  12. .getOrCreate()# 2- 数据输入# 默认从Topic开头一直消费到结尾# 对每个分区指定具体消费的offset,了解即可。实际工作很少用# init_df = spark.read\# .format("kafka")\# .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\# .option("subscribe","test02,test03")\# .option("startingOffsets","""{"test02":{"0":1}}""")\# .load()
  13. init_df = spark.read \
  14. .format("kafka") \
  15. .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092") \
  16. .option("subscribe","test02,test03") \
  17. .option("startingOffsets","earliest") \
  18. .load()# 3- 数据处理
  19. result_df1 = init_df.select(F.expr("cast(value as string) as value"))# selectExpr = select + F.expr
  20. result_df2 = init_df.selectExpr("cast(value as string) as value")
  21. result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))# 4- 数据输出print("result_df1")
  22. result_df1.show()print("result_df2")
  23. result_df2.show()print("result_df3")
  24. result_df3.show(n=100)# 5- 释放资源
  25. spark.stop()

可能遇到的错误:
在这里插入图片描述

  1. 原因: 如果有指定startingOffsets或者endingOffsets,需要指定所有分区的offset
  2. -1: latest,最新的地方
  3. -2: earliest,最旧的地方

必备参数:
选项值说明assign通过一个Json 字符串的方式来表示: {“topicA”:[0,1],“topicB”:[2,4]}设置使用特定的TopicPartitionssubscribe以逗号分隔的Topic主题列表要订阅的主题列表subscribePattern正则表达式字符串订阅匹配符合条件的Topic。assign、subscribe、subscribePattern任意指定一个。kafka.bootstrap.servers以英文逗号分隔的host:port列表指定kafka服务的地址

数据写入Kafka中

流式处理

官方给出方案:

  1. # 将Key和Value的数据都写入到Kafka当中
  2. ds = df \
  3. .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  4. .writeStream \
  5. .format("kafka") \
  6. .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  7. .option("topic", "topic1") \
  8. .start()
  9. # 将Key和Value的数据都写入到Kafka当中。使用DataFrame数据中的Topic字段来指定要将数据写入到Kafka集群
  10. 的哪个Topic中。这种方式适用于消费多个Topic的情况
  11. ds = df \
  12. .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  13. .writeStream \
  14. .format("kafka") \
  15. .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  16. .start()

写出到指定Topic

  1. import os
  2. from pyspark.sql import SparkSession
  3. import pyspark.sql.functions as F
  4. # 绑定指定的Python解释器
  5. os.environ['SPARK_HOME']='/export/server/spark'
  6. os.environ['PYSPARK_PYTHON']='/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON']='/root/anaconda3/bin/python3'if __name__ =='__main__':# 1- 创建SparkSession对象
  8. spark = SparkSession.builder\
  9. .config("spark.sql.shuffle.partitions",1)\
  10. .appName('ss_read_kafka_1_topic')\
  11. .master('local[*]')\
  12. .getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费
  13. init_df = spark.readStream\
  14. .format("kafka")\
  15. .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
  16. .option("subscribe","test01")\
  17. .load()# 3- 数据处理
  18. result_df = init_df.select(F.expr("concat(cast(value as string),'_itheima') as value"))# 4- 数据输出# 5- 启动流式任务
  19. result_df.writeStream.format("kafka")\
  20. .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
  21. .option("topic","test02")\
  22. .option("checkpointLocation","hdfs://node1:8020/chk")\
  23. .start()\
  24. .awaitTermination()

从数据内容中解析得到Topic,然后写入Kafka:

  1. import os
  2. from pyspark.sql import SparkSession
  3. import pyspark.sql.functions as F
  4. # 绑定指定的Python解释器
  5. os.environ['SPARK_HOME']='/export/server/spark'
  6. os.environ['PYSPARK_PYTHON']='/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON']='/root/anaconda3/bin/python3'if __name__ =='__main__':# 1- 创建SparkSession对象
  8. spark = SparkSession.builder\
  9. .config("spark.sql.shuffle.partitions",1)\
  10. .appName('ss_read_kafka_multi_topic')\
  11. .master('local[*]')\
  12. .getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费
  13. init_df = spark.readStream\
  14. .format("kafka")\
  15. .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
  16. .option("subscribePattern","test.*")\
  17. .load()# 3- 数据处理# 错误写法:缺少topic字段# result_df = init_df.select(F.expr("topic as new_topic"),F.expr("concat(cast(value as string),'_',topic) as value"))
  18. result_df = init_df.select("topic",F.expr("concat(cast(value as string),'_',topic) as value"))# 4- 数据输出# 5- 启动流式任务
  19. result_df.writeStream.format("console").outputMode("append").start()
  20. result_df.writeStream.format("kafka")\
  21. .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
  22. .option("checkpointLocation","hdfs://node1:8020/chk")\
  23. .start()\
  24. .awaitTermination()

可能遇到的错误一:
在这里插入图片描述

  1. 原因: 当从数据中解析得到Topic信息的时候,最终输出到Kafka的那个DataFrame中必须要有topic

可能遇到的错误二:
在这里插入图片描述

  1. 原因: 输出到Kafka中的数据,需要命名value。而且数据类型需要是string或者binary(二进制)

备注Column数据类型可选字段keystring or binary必填字段valuestring or binary可选字段headersstring or binary必填字段topicstring可选字段partitionint

批处理

官方给出方案:

  1. # 从DataFrame中写入key-value数据到一个选项中指定的特定Kafka topic中
  2. df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  3. .write \
  4. .format("kafka") \
  5. .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  6. .option("topic", "topic1") \
  7. .save()
  8. # 使用数据中指定的主题将key-value数据从DataFrame写入Kafka
  9. df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  10. .write \
  11. .format("kafka") \
  12. .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  13. .save()
  1. import os
  2. from pyspark.sql import SparkSession
  3. import pyspark.sql.functions as F
  4. # 绑定指定的Python解释器
  5. os.environ['SPARK_HOME']='/export/server/spark'
  6. os.environ['PYSPARK_PYTHON']='/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON']='/root/anaconda3/bin/python3'if __name__ =='__main__':# 1- 创建SparkSession对象
  8. spark = SparkSession.builder\
  9. .config("spark.sql.shuffle.partitions",1)\
  10. .appName('ss_read_kafka_1_topic')\
  11. .master('local[*]')\
  12. .getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费
  13. init_df = spark.read\
  14. .format("kafka")\
  15. .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
  16. .option("subscribe","test02")\
  17. .load()# 3- 数据处理
  18. result_df = init_df.select(F.expr("concat(cast(value as string),'_itheima') as value"))# 4- 数据输出# 5- 启动流式任务
  19. result_df.write.format("kafka")\
  20. .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
  21. .option("topic","test02")\
  22. .option("checkpointLocation","hdfs://node1:8020/chk")\
  23. .save()

备注Column数据类型可选字段keystring or binary必填字段valuestring or binary可选字段headersarray必填字段topicstring可选字段partitionint

标签: kafka 分布式 spark

本文转载自: https://blog.csdn.net/qq_50215015/article/details/135597715
版权归原作者 小希 fighting 所有, 如有侵权,请联系我们删除。

“Structured_Streaming和Kafka整合”的评论:

还没有评论