0


摸鱼大数据——Spark Structured Steaming——物联网数据分析案例

1、数据模拟器代码

  • 1- 创建一个topic, 放置后续物联网的数据 search-log-topic
 ./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --create --topic search-log-topic --partitions 3 --replication-factor 2
  • 2- 将代码放置到项目中:

import json
import random
import sys
import time
import os
from kafka import KafkaProducer
from kafka.errors import KafkaError

锁定远端操作环境, 避免存在多个版本环境的问题

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ["PYSPARK_PYTHON"] = "/root/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/root/anaconda3/bin/python"

快捷键: main 回车

if name == 'main':
print("模拟物联网数据")

# 1- 构建一个kafka的生产者:
 producer = KafkaProducer(
     bootstrap_servers=['node1:9092', 'node2:9092', 'node3:9092'],
     acks='all',
     value_serializer=lambda m: json.dumps(m).encode("utf-8")
 )

# 2- 物联网设备类型
 deviceTypes = ["洗衣机", "油烟机", "空调", "窗帘", "灯", "窗户", "煤气报警器", "水表", "燃气表"]

while True:
     index = random.choice(range(0, len(deviceTypes)))
     deviceID = f'device_{index}_{random.randrange(1, 20)}'  # 设备ID
     deviceType = deviceTypes[index]  # 设备类型
     deviceSignal = random.choice(range(10, 100)) # 设备信号

    # 组装数据集
     print({'deviceID': deviceID, 'deviceType': deviceType, 'deviceSignal': deviceSignal,
            'time': time.strftime('%s')})

    # 发送数据
     producer.send(topic='search-log-topic',
                   value={'deviceID': deviceID, 'deviceType': deviceType, 'deviceSignal': deviceSignal,
                                    'time': time.strftime('%s')}
     )

    # 间隔时间 5s内随机
     time.sleep(random.choice(range(1, 5)))
  • 测试, 观察是否可以正常生成:
 ./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --describe --topic search-log-topic

2、需求说明

目前咱们有一个模拟器程序, 可以向Kafka不断的写入数据

要做的是, 用Spark的结构化流接收数据, 并且对数据进行统计分析操作:

  • 求: 各种信号强度>30各种类型的设备数量 和 它们的平均信号强度

需求分析:

1- 需要按照设备类型进行分组,也就是维度是设备类型deviceType

2- 指标

设备数量:deviceID

平均信号强度:deviceSignal

示例数据:

{'deviceID': 'device_1_1', 'deviceType': '油烟机', 'deviceSignal': 23, 'time': '1668848417'} {'deviceID': 'device_0_4', 'deviceType': '洗衣机', 'deviceSignal': 55, 'time': '1668848418'}

 deviceID: 设备ID
 deviceType: 设备类型
 deviceSignal: 设备信号
 time : 设备发送时间戳

3、代码实现

 from pyspark import SparkConf, SparkContext
 import os
 from pyspark.sql import SparkSession
 import pyspark.sql.functions as F
 ​
 # 绑定指定的Python解释器
 os.environ['SPARK_HOME'] = '/export/server/spark'
 os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
 os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
 ​
 ​
 def sql():
     # SQL
     # 3.2- 拆解数据结构。将json解析得到单个的字段
     """
         get_json_object(参数1,参数2):用来解析json串。一次只能得到一个字段的值
             参数1:要解析的json字段名称
             参数2:字段的解析路径 $.字段路径
     """
     etl_df = spark.sql("""
         select
             get_json_object(value,'$.deviceID') as deviceID,
             get_json_object(value,'$.deviceType') as deviceType,
             get_json_object(value,'$.deviceSignal') as deviceSignal,
             get_json_object(value,'$.time') as time
         from iot
     """)
     etl_df.createTempView("etl")
 ​
     # 3.3- 各种信号强度>30各种类型的设备数量  和  它们的平均信号强度
     result_df = spark.sql("""
         select
             deviceType,
             count(deviceID) as cnt_deviceID,
             round(avg(deviceSignal),2) as avg_deviceSignal
         from etl
         where deviceSignal>30
         group by deviceType
     """)
 ​
     # 4- 数据输出
     # 5- 启动流式任务
     result_df.writeStream.format('console').outputMode('complete').start().awaitTermination()
 ​
 ​
 def dsl():
     result_df = etl_tmp_df.select(
         F.get_json_object('value', '$.deviceID').alias('deviceID'),
         F.get_json_object('value', '$.deviceType').alias('deviceType'),
         F.get_json_object('value', '$.deviceSignal').alias('deviceSignal'),
         F.get_json_object('value', '$.time').alias('time')
     ).where('deviceSignal>30').groupBy('deviceType').agg(
         F.count('deviceID').alias('cnt_deviceID'),
         F.round(F.avg('deviceSignal'), 2).alias('avg_deviceSignal')
     )
     
     # 4- 数据输出
     # 5- 启动流式任务
     result_df.writeStream.format('console').outputMode('complete').start().awaitTermination()
 ​
 ​
 if __name__ == '__main__':
     # 1- 创建SparkSession对象
     spark = SparkSession.builder\
         .config("spark.sql.shuffle.partitions",2)\
         .appName('iot')\
         .master('local[*]')\
         .getOrCreate()
 ​
     # 2- 数据输入
     init_df = spark.readStream\
         .format("kafka") \
         .option("kafka.bootstrap.servers", "node1.itcast.cn:9092,node2.itcast.cn:9092") \
         .option("subscribe", "search-log-topic") \
         .load()
 ​
     # 3- 数据处理
     # 3.1- 数据ETL:进行数据类型转换,将value字段bytes->字符串
     etl_tmp_df = init_df.selectExpr("cast(value as string) as value")
     etl_tmp_df.createTempView('iot')
 ​
     # SQL
     # sql()
 ​
     # DSL
     dsl()

运行结果截图:

结构化流不支持的操作:

  • 多个流同时聚合
  • limit和take不能使用
  • 不能使用去重操作
  • Few types of outer joins on streaming Datasets are not supported. See the support matrix in the Join Operations section for more details.

本文转载自: https://blog.csdn.net/weixin_65694308/article/details/140453328
版权归原作者 困了就倒头睡 所有, 如有侵权,请联系我们删除。

“摸鱼大数据——Spark Structured Steaming——物联网数据分析案例”的评论:

还没有评论