1、数据模拟器代码
- 1- 创建一个topic, 放置后续物联网的数据 search-log-topic
./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --create --topic search-log-topic --partitions 3 --replication-factor 2
- 2- 将代码放置到项目中:
import json
import random
import sys
import time
import os
from kafka import KafkaProducer
from kafka.errors import KafkaError
锁定远端操作环境, 避免存在多个版本环境的问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ["PYSPARK_PYTHON"] = "/root/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/root/anaconda3/bin/python"
快捷键: main 回车
if name == 'main':
print("模拟物联网数据")
# 1- 构建一个kafka的生产者:
producer = KafkaProducer(
bootstrap_servers=['node1:9092', 'node2:9092', 'node3:9092'],
acks='all',
value_serializer=lambda m: json.dumps(m).encode("utf-8")
)
# 2- 物联网设备类型
deviceTypes = ["洗衣机", "油烟机", "空调", "窗帘", "灯", "窗户", "煤气报警器", "水表", "燃气表"]
while True:
index = random.choice(range(0, len(deviceTypes)))
deviceID = f'device_{index}_{random.randrange(1, 20)}' # 设备ID
deviceType = deviceTypes[index] # 设备类型
deviceSignal = random.choice(range(10, 100)) # 设备信号
# 组装数据集
print({'deviceID': deviceID, 'deviceType': deviceType, 'deviceSignal': deviceSignal,
'time': time.strftime('%s')})
# 发送数据
producer.send(topic='search-log-topic',
value={'deviceID': deviceID, 'deviceType': deviceType, 'deviceSignal': deviceSignal,
'time': time.strftime('%s')}
)
# 间隔时间 5s内随机
time.sleep(random.choice(range(1, 5)))
- 测试, 观察是否可以正常生成:
./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --describe --topic search-log-topic
2、需求说明
目前咱们有一个模拟器程序, 可以向Kafka不断的写入数据
要做的是, 用Spark的结构化流接收数据, 并且对数据进行统计分析操作:
- 求: 各种信号强度>30各种类型的设备数量 和 它们的平均信号强度
需求分析:
1- 需要按照设备类型进行分组,也就是维度是设备类型deviceType
2- 指标
设备数量:deviceID
平均信号强度:deviceSignal
示例数据:
{'deviceID': 'device_1_1', 'deviceType': '油烟机', 'deviceSignal': 23, 'time': '1668848417'} {'deviceID': 'device_0_4', 'deviceType': '洗衣机', 'deviceSignal': 55, 'time': '1668848418'}
deviceID: 设备ID
deviceType: 设备类型
deviceSignal: 设备信号
time : 设备发送时间戳
3、代码实现
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
def sql():
# SQL
# 3.2- 拆解数据结构。将json解析得到单个的字段
"""
get_json_object(参数1,参数2):用来解析json串。一次只能得到一个字段的值
参数1:要解析的json字段名称
参数2:字段的解析路径 $.字段路径
"""
etl_df = spark.sql("""
select
get_json_object(value,'$.deviceID') as deviceID,
get_json_object(value,'$.deviceType') as deviceType,
get_json_object(value,'$.deviceSignal') as deviceSignal,
get_json_object(value,'$.time') as time
from iot
""")
etl_df.createTempView("etl")
# 3.3- 各种信号强度>30各种类型的设备数量 和 它们的平均信号强度
result_df = spark.sql("""
select
deviceType,
count(deviceID) as cnt_deviceID,
round(avg(deviceSignal),2) as avg_deviceSignal
from etl
where deviceSignal>30
group by deviceType
""")
# 4- 数据输出
# 5- 启动流式任务
result_df.writeStream.format('console').outputMode('complete').start().awaitTermination()
def dsl():
result_df = etl_tmp_df.select(
F.get_json_object('value', '$.deviceID').alias('deviceID'),
F.get_json_object('value', '$.deviceType').alias('deviceType'),
F.get_json_object('value', '$.deviceSignal').alias('deviceSignal'),
F.get_json_object('value', '$.time').alias('time')
).where('deviceSignal>30').groupBy('deviceType').agg(
F.count('deviceID').alias('cnt_deviceID'),
F.round(F.avg('deviceSignal'), 2).alias('avg_deviceSignal')
)
# 4- 数据输出
# 5- 启动流式任务
result_df.writeStream.format('console').outputMode('complete').start().awaitTermination()
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",2)\
.appName('iot')\
.master('local[*]')\
.getOrCreate()
# 2- 数据输入
init_df = spark.readStream\
.format("kafka") \
.option("kafka.bootstrap.servers", "node1.itcast.cn:9092,node2.itcast.cn:9092") \
.option("subscribe", "search-log-topic") \
.load()
# 3- 数据处理
# 3.1- 数据ETL:进行数据类型转换,将value字段bytes->字符串
etl_tmp_df = init_df.selectExpr("cast(value as string) as value")
etl_tmp_df.createTempView('iot')
# SQL
# sql()
# DSL
dsl()
运行结果截图:
结构化流不支持的操作:
- 多个流同时聚合
- limit和take不能使用
- 不能使用去重操作
- Few types of outer joins on streaming Datasets are not supported. See the support matrix in the Join Operations section for more details.
版权归原作者 困了就倒头睡 所有, 如有侵权,请联系我们删除。