作者:禅与计算机程序设计艺术
1.简介
Apache Spark Streaming 是 Apache Spark 提供的一个用于高吞吐量、容错的流式数据处理引擎。它可以实时的接收数据并在系统内部以微批次的方式进行处理,并将结果输出到文件、数据库或实时消息系统中。Spark Streaming 支持 Java、Scala 和 Python 编程语言。本文将详细介绍 Spark Streaming 的相关原理及功能特性,包括其核心概念和术语、架构设计、主要组件及应用场景等。最后,通过实际案例展示如何在 Hadoop Yarn 上部署和运行 Spark Streaming 流程,并对比 Spark Structured Streaming 对实时流式数据分析的优缺点。
文章假定读者具有一定的编程能力,并且熟悉 Hadoop MapReduce 或 Spark 基本的 API 操作。对于 Java 开发人员来说,还需要掌握 Java 多线程编程模型和集合框架等知识。
2.基本概念和术语
2.1 Apache Spark Streaming概述
Apache Spark Streaming 是 Apache Spark 提供的一个用于高吞吐量、容错的流式数据处理引擎。它可以实时的接收数据并在系统内部以微批次的方式进行处理,并将结果输出到文件、数据库或实时消息系统中。Spark Streaming 使用反应式数据流(Reactive Data Stream)编程模型,它允许对实时输入的数据进行快速地、批量地、增量地处理。
Spark Streaming 可以同时支持离线数据处理和实时数据处理。它的输入数据来源可以是任何可被 Spark 支持的文件格式,比如 HDFS、Kafka、Flume、Kinesis、TCP Socket、或从其他消息队列读取。然后,Spark Streaming 会对这些输入数据进行预处理,以便让数据能够按照微批次的形式传递给后续的处理阶段。在后续处理阶段,Spark Streaming 会对每个微批次的数据进行转换、过滤、聚合等操作,最终输出到用户指定的目的地。
Spark Streaming 在系统架构上有以下几个重要的组成部分:
- 数据源:来自于外部的实时数据源,如 Kafka、Flume、Kinesis、TCP Socket、MQTT 或 RabbitMQ。
- 数据采集器:从数据源接收数据的驱动程序模块,它会启动一个独立的 JVM 来执行数据收集任务。
- 消费者群组:由多个消费者线程组成的线程池,它们负责消费微批次的数据并执行处理操作。
- 状态存储服务:Spark Streaming 会将每个窗口期间的统计信息保存在内存中或者磁盘中,用作持久化存储和容错恢复。
- 计算引擎:Spark Streaming 的核心,它根据 DStream 中的数据和相关操作构建数据管道,并运行每个微批次的数据处理逻辑。
- 输出控制:决定何时将结果数据写入到外部存储系统,例如,输出到 HDFS、Kafka、数据库、消息队列等。
Spark Streaming 与其它流处理系统相比,最大的不同之处在于它的计算模型是微批处理模式,而不是基于时间或事件触发的方式。这种微批处理模式意味着即使在有限的计算资源下也能提供低延迟的实时数据处理能力。因此,Spark Streaming 比传统的离线批处理系统更加适合对实时数据进行分析。
2.2 基本概念
2.2.1 数据源(Data Sources)
数据源是一个广义上的概念,指的是输入到 Spark Streaming 的源头,可以是源于网络、磁盘、数据库、应用日志、或来自其他实时数据源。Spark Streaming 支持多种数据源类型,包括文件、TCP socket、Kafka、Flume、Kinesis、及其它消息队列。
2.2.2 数据采集器(Input DStreams)
数据采集器(Input DStreams)是一个不可变的 DStream,它代表了输入数据源的数据流。当调用 SparkContext.stream() 方法创建 DStream 时,就会自动创建相应的 Input DStreams。数据采集器会启动一个单独的 JVM 来执行数据采集任务,把数据发送到驱动程序所在的 JVM 中,数据采集器负责接收外部源的输入数据。
2.2.3 消费者群组(Consumer Groups)
消费者群组是由多个消费者线程组成的线程池,它们负责消费微批次的数据并执行处理操作。消费者群组保证数据按顺序且不会丢失,并且可以允许某些消费者失败而不影响整个流处理过程。一般情况下,Spark Streaming 每个输入 DStream 都会分配一个默认的消费者群组,但是也可以指定自定义的消费者群组。
2.2.4 DStream 操作(Operations on DStreams)
DStream 提供了一系列的操作方法,比如 map、flatMap、filter、reduceByKey 等。这些方法提供了丰富的转换和处理能力,可以对实时数据进行快速、复杂、并行地处理。DStream 操作分为两类:内置操作和用户定义操作。
内置操作:这些操作都是由 Spark 系统直接提供的,可以在任意的 DStream 上执行,比如 filter、map、flatMap、join、union 等。这些操作一般情况下性能比较高效,但只能使用简单的函数来实现。
用户定义操作:用户可以通过两种方式定义自己的 DStream 操作。第一种方式是在 Scala、Java 或 Python 代码中调用 transformation() 或 transform() 函数,这两个函数都可以对 DStream 执行任意的用户自定义操作。第二种方式是继承 DStreamTransformations 类,并重写 compute() 函数,然后调用 DStream 对象上的 transform() 函数。
除了提供数据处理能力外,DStream 操作还可以与状态流动结合起来,形成状态变化的流处理模型。
2.2.5 微批次(Microbatches)
Spark Streaming 将数据处理分成一系列的微批次(microbatch),这样就可以在短暂的时间间隔内完成处理。每一次微批次处理结束后,Spark Streaming 会把结果输出到目标系统。微批次的大小取决于 Spark Streaming 应用程序的吞吐量需求、应用程序中使用的机器的数量和配置、输入数据量的大小、以及数据处理的复杂程度。微批次的大小一般设置为几百KB到几MB之间。
2.2.6 窗口(Windows)
Spark Streaming 以微批次的方式处理数据,但它并不是处理所有数据,而是只处理最近的一段时间的输入数据。这种窗口机制是 Spark Streaming 最重要的特性之一,它允许实时流式数据处理模型中的延迟和滑动窗口。窗口一般设置为几秒到几分钟。当某个窗口期结束时,Spark Streaming 会计算该窗口中的结果数据,并清除窗口中的旧数据,继续等待下一个新窗口的数据。
2.2.7 依赖关系(Dependencies)
依赖关系是指不同 DStream 操作之间的依赖关系。依赖关系一般是指前一个操作的输出作为后一个操作的输入。如果前面的操作发生改变,那么后面操作也要重新执行。Spark Streaming 有两种类型的依赖关系。
- 窄依赖:一个 DStream 只能跟随一个操作,即只有一个父节点;
- 深度依赖:一个 DStream 可能跟随多个操作,即有多个父节点。
宽依赖会导致数据重复计算,不利于性能优化;而深度依赖会导致复杂的调度和协调,增加了处理数据的复杂性。
2.2.8 RDD 持久化(RDD Persistence)
Spark Streaming 采用持久化策略来缓存已经处理过的数据,以提高实时数据处理的性能。Spark Streaming 使用两种持久化策略。
- MEMORY_ONLY:仅保存在内存中,也就是说 Spark Streaming 只会将数据保存在内存中,然后再写入磁盘;
- MEMORY_AND_DISK:既可以保存在内存中,也可以在磁盘上持久化。
2.2.9 检测实时数据中的异常值(Detecting Anomalies in Real-time Data)
实时数据中经常会出现各种异常值,比如空值、零值、无效值、超出范围的值等。为了检测实时数据中的异常值,Spark Streaming 提供了一个名为滑动窗口计数器的方法。这种方法利用滑动窗口来统计一定时间间隔内输入数据中特定值的数量,然后再利用统计结果来判断是否有异常值。
2.2.10 流水线(Pipelines)
流水线是指多个 DStream 操作的连接组合,它描述了实时数据处理流程。流水线通常包括数据源、数据采集器、消费者群组、DStream 操作、输出控制等元素。
3.Spark Streaming 架构设计
3.1 Spark Streaming 架构图
Spark Streaming 整体架构由四个主要组件组成:Driver、Executor、Cluster Manager、Streaming Context。其中,Streaming Context 负责管理实时流式处理流程,它和 Cluster Manager 建立联系,获取 Executor 的资源信息,向 Driver 发送数据源的位置信息,并监控运行状况。Driver 根据 Streaming Context 的要求启动多个 Executor,并根据调度策略对数据流进行切分和调度,将数据流输送到不同的 Executor 上进行计算。Executor 则是实际运行计算的实体,每个 Executor 都能处理数据流的一个子集。当有新的数据进入数据源时,Spark Streaming 就会创建一个 DStream,并将数据流导入到 DStream 所在的 Executor 中,供计算使用。
3.2 Spark Streaming 模型结构
Spark Streaming 模型结构由三个层级构成:DStream、DataFrame/DataSet、SQL。
DStream(弹性数据流)是 Spark Streaming 中最基本的数据抽象,它代表着实时数据流的连续序列。DStream 可以从各种数据源生成,比如 Kafka、Flume、Kinesis、TCP Socket、文件系统、或通过实时数据源生成,如 Kafka Streaming。DStream 通过一系列的 transformations 和 actions,可以进行各种操作,比如 filtering、transforming、aggregating 等。
DataFrame/DataSet 作为 Spark SQL 中的数据抽象,它类似于关系型数据库中的表格。它也可以从各种数据源生成,比如 Kafka、Flume、Kinesis、TCP Socket、文件系统等。DataFrame/DataSet 可以用来对数据进行更高级的操作,比如 joining、filtering、grouping 等。
SQL 是 Spark SQL 中的查询语言,它可以用于查询 DataFrame/DataSet 中的数据。SQL 查询的结果可以进一步转化为 DataFrame/DataSet。
4.Spark Streaming 特点
4.1 高吞吐量
Spark Streaming 可以在微批次的基础上,以较低的延迟和较高的吞吐量处理实时数据。微批次的大小取决于硬件资源、网络带宽、数据量、数据处理任务的复杂程度等,但它一般以 10s 或更短的时间片为宜。Spark Streaming 的优势之一就是它能处理海量数据,而且不需要等待大量数据积累到一定程度才开始处理。
4.2 容错性
由于 Spark Streaming 是微批处理模式,所以它具有很好的容错性。如果某个节点出现故障,它只会影响当前正在处理的微批次数据,其他微批次的数据会被接纳到正常运行的集群中。由于微批次处理模式下的容错性,Spark Streaming 可以轻松应对因节点故障、网络拥塞等问题而造成的暂时性中断。
4.3 动态调整
Spark Streaming 能够适应数据源的变化,能够动态调整数据处理规模,并自动平衡集群资源,提升数据处理的弹性。举个例子,如果数据源中新增数据,Spark Streaming 可以自动扩大集群规模来处理新增的数据。Spark Streaming 能够自动发现异常节点并重新启动,从而减少中断造成的数据丢失风险。
4.4 迭代计算
Spark Streaming 具有良好的迭代计算能力。它通过微批次处理模式来保证数据处理的一致性和完整性,同时也能支持灵活的迭代计算模式,能够方便地进行模型训练、参数更新、异常检测等。
5.Spark Streaming 应用场景
5.1 IoT 数据分析
IoT (Internet of Things,物联网) 是指利用互联网技术、通讯协议、应用程序平台、路由器、智能终端、传感器、控制器等一系列技术和工具,实现远程分布式管理和控制物理世界中的物体、系统和服务的一种技术。Spark Streaming 可用于对 IoT 设备产生的大量数据进行实时分析,并根据分析结果做出相关决策。
5.2 用户行为分析
用户行为分析是对网站或 App 的用户行为进行细粒度、连续、及时、全面的分析,帮助公司了解用户的喜好、习惯和偏好,从而改善产品质量、提升营收、降低运营成本等。Spark Streaming 可用于对用户行为数据进行实时分析,进行相关的分析、挖掘、关联、过滤等,并实时生成报表、警报等。
5.3 实时推荐系统
实时推荐系统是一个基于内容的推荐系统,它不断地根据用户的行为习惯、兴趣和喜好来推荐商品和服务。Spark Streaming 可用于实时处理用户行为数据,基于内容的推荐系统将会实时更新推荐结果。
6.Spark Streaming 与 Structured Streaming 对比
6.1 Spark Streaming 架构
Spark Streaming 的架构如上所示,由四个主要组件构成:Driver、Executor、Cluster Manager、Streaming Context。Streaming Context 负责管理实时流式处理流程,它和 Cluster Manager 建立联系,获取 Executor 的资源信息,向 Driver 发送数据源的位置信息,并监控运行状况。Driver 根据 Streaming Context 的要求启动多个 Executor,并根据调度策略对数据流进行切分和调度,将数据流输送到不同的 Executor 上进行计算。Executor 则是实际运行计算的实体,每个 Executor 都能处理数据流的一个子集。当有新的数据进入数据源时,Spark Streaming 就会创建一个 DStream,并将数据流导入到 DStream 所在的 Executor 中,供计算使用。
6.2 Structured Streaming 架构
Structured Streaming 也是 Apache Spark 的实时流式处理框架。与 Spark Streaming 不同的是,Structured Streaming 采用了一个比 Spark Streaming 更高级、更专业的模型结构。
Structured Streaming 的架构与 Spark Streaming 非常相似。它由 Streaming QueryManager、DataStreamWriter、DataSourceV2、DataStreamReader、DataStreamReader 的不同插件和核心组件构成。
Streaming QueryManager 负责管理实时流式处理流程,它和 Cluster Manager 建立联系,获取 Executor 的资源信息,向 Driver 发送数据源的位置信息,并监控运行状况。DataSourceV2 插件负责从各种数据源读取数据,并转换为 DataStream。DataStreamReader 插件负责从已有的 DataStream 中读取数据。DataStreamWriter 插件负责写入数据到数据源。
Spark Streaming 和 Structured Streaming 都是实时流式处理框架,但它们各自在模型设计、架构实现、核心组件等方面都有一些不同。由于 Spark Streaming 的易用性和开源社区,许多企业都选择使用 Spark Streaming 来进行实时数据处理。
7.Spark Streaming 安装配置与运行
7.1 安装 Spark
安装 Spark 之前,请先安装 Hadoop。
sudo apt update && sudo apt install openjdk-8-jre -y
wget https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
tar xvf spark-3.0.1-bin-hadoop3.2.tgz
mv spark-3.0.1-bin-hadoop3.2 /opt/spark
echo "export PATH=$PATH:/opt/spark/bin" >> ~/.bashrc
source ~/.bashrc
7.2 配置 Spark Streaming
编辑 Spark 的配置文件
conf/spark-env.sh
,设置环境变量
$SPARK_HOME
。
echo 'export SPARK_HOME=/opt/spark' >> conf/spark-env.sh
配置 Spark Streaming 的配置文件
conf/spark-defaults.conf
,设置 Spark Web UI 和日志级别。
echo'spark.eventLog.enabled true' >> conf/spark-defaults.conf
echo'spark.eventLog.dir file:///tmp/spark-events' >> conf/spark-defaults.conf
echo'spark.logConf false' >> conf/spark-defaults.conf
echo'spark.logLevel INFO' >> conf/spark-defaults.conf
7.3 创建 SparkSession
创建 SparkSession。
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext("local", "MyApp")
ss = SparkSession(sc)
7.4 Spark Streaming 基本使用
编写一个最简单的 Spark Streaming 程序,它读取本地文件
README.md
文件的内容,并输出到控制台。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingTest")
ssc = StreamingContext(sc, 5) # batch interval is 5 seconds
lines = ssc.textFileStream("/path/to/directory/")
lines.pprint()
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
7.5 运行 Spark Streaming
pyspark --jars spark-streaming-kafka-0-10_2.12-3.0.1.jar,/home/user/myproject/libs/*
8.Spark Streaming 流处理案例
8.1 用 Spark Streaming 实时处理 Kafka 数据
下载 ksqlDB 命令:
wget https://github.com/confluentinc/ksql/releases/download/v0.13.0/ksqldb-server-0.13.0-final-linux.zip
unzip ksqldb-server-0.13.0-final-linux.zip
cd ksqldb-server-0.13.0
下载 Connectors:
wget http://packages.confluent.io/archive/connectors/kafka-connect-elasticsearch-5.2.2/kafka-connect-elasticsearch-5.2.2.zip
unzip kafka-connect-elasticsearch-5.2.2.zip
将 Elasticsearch 服务开启:
systemctl start elasticsearch.service
修改 ksqlDB 配置文件
config/ksql-server.properties
,添加 Elasticsearch sink connector:
listeners=http://localhost:8088
bootstrap.servers=PLAINTEXT://localhost:9092
ksql.schema.registry.url=http://localhost:8081
ksql.streams.state.dir=/var/lib/ksql/data
ksql.sink.topic.auto.create=true
plugin.path=/usr/share/java,/etc/ksql,/home/mhope/Downloads/kafka-connect-elasticsearch-5.2.2/connectors/dist
kafka.connect.elasticsearch.jest.cluster.name=elasticsearch
kafka.connect.elasticsearch.connection.url=http://localhost:9200
kafka.connect.elasticsearch.type.name=logs
启动 ksqlDB Server:
./bin/ksql-server-start config/ksql-server.properties
创建 ksqlDB 数据库和表:
./bin/ksql http://localhost:8088 <<EOF
CREATE DATABASE log;
USE log;
CREATE TABLE logs (id INT PRIMARY KEY, message STRING);
EXIT ;
EOF
创建 SparkSession:
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext(appName="PythonStreamingTest")
ss = SparkSession(sc)
编写 Spark Streaming 程序:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def printLines(rdd):
"""
Print each line from an RDD obtained by reading a text file
:param rdd: input RDD with one element per line read from the file
"""
try:
for record in rdd.collect():
print(record)
except Exception as e:
pass
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingTest")
ssc = StreamingContext(sc, 1) # batch interval is 1 second
kafkaParams = {
#'metadata.broker.list': 'localhost:9092',
'metadata.broker.list': '192.168.xxx.xx:9092',
'subscribe': ['logs']
}
lines = ssc.socketTextStream('localhost', 9999)
lines.foreachRDD(lambda rdd: printLines(rdd))
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
启动数据源:
kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic logs
启动 Spark Streaming 程序:
$SPARK_HOME/bin/spark-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,org.xerial.snappy:snappy-java:1.1.7.3 \
stream_logs.py
在另一个命令行窗口中,启动数据订阅者:
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic logs \
--from-beginning
发送测试数据:
kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic logs
This is test data
And this is more test data
查看 Elasticsearch 索引中的数据:
curl -XGET http://localhost:9200/_search?pretty
{
"query": {
"match_all": {}
},
"sort": [
"_doc"
]
}
8.2 用 Spark Streaming 实时分析 ClickHouse 日志
下载 Elasticsearch commandline tools:
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.5.2-linux-x86_64.tar.gz
tar zxvf elasticsearch-7.5.2-linux-x86_64.tar.gz
mkdir ~/elastic && mv elasticsearch-7.5.2 ~/elastic/es-7.5.2
配置 Elasticsearch:
mkdir ~/elastic/es-7.5.2/config/scripts
touch ~/elastic/es-7.5.2/config/scripts/custom-mapping.sh
chmod +x ~/elastic/es-7.5.2/config/scripts/custom-mapping.sh
vim ~/elastic/es-7.5.2/config/scripts/custom-mapping.sh
#!/bin/bash
curl -X PUT "$1/$2/_mapping/" -d '{
"properties": {
"@timestamp": {"type": "date"},
"level": {"type": "keyword"}
}
}'
nohup./bin/elasticsearch > es.out &
启动 Elasticsearch:
./config/scripts/custom-mapping.sh http://localhost:9200 clickhouse_*
nohup ~/elastic/es-7.5.2/bin/elasticsearch > es.out &
创建 MySQL 数据库:
mysql -u root -p
Enter password:
MariaDB [(none)]> CREATE DATABASE IF NOT EXISTS clickhouse;
创建 ClickHouse 表:
cat create_table.sql | mysql -u default -p clickhouse
Enter password:
配置 ClickHouse JDBC driver:
vim application.properties
jdbc.driverClassName=ru.yandex.clickhouse.ClickHouseDriver
jdbc.url=jdbc:clickhouse://localhost:8123/default
jdbc.username=default
jdbc.password=
创建 SparkSession:
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext(appName="PythonStreamingTest")
ss = SparkSession(sc)
编写 Spark Streaming 程序:
import re
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def process_message(message):
if not message:
return
timestamp = int(time.mktime(time.strptime(re.findall('\[(.*?)\]', message)[0], '%Y-%m-%d %H:%M:%S'))) * 1000
level = re.findall('(INFO|WARNING|ERROR)', message)[0]
sql = f"""
INSERT INTO clickhouse.{level}(
@timestamp,
message
) VALUES ({timestamp}, '{message}')
"""
ss.sql(sql).collect()
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingTest")
ssc = StreamingContext(sc, 1) # batch interval is 1 second
kafkaParams = {
#'metadata.broker.list': 'localhost:9092',
'metadata.broker.list': '192.168.xxx.xx:9092',
'group.id': 'clickhouse_logs',
'auto.offset.reset': 'latest'
}
messages = ssc.readStream\
.format('kafka')\
.option('kafka.bootstrap.servers', ','.join([f'{ip}:9092' for ip in ['192.168.xxx.xx', '192.168.xxx.xx']]))\
.option('subscribe', 'clickhouse.*')\
.load()\
.selectExpr('CAST(value AS STRING)')\
.asJavaDStream()
messages.foreachRDD(lambda rdd: rdd.foreachPartition(process_message))
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
启动数据源:
kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic clickhouse.info \
--property parse.key=false \
--property key.separator='|'
启动 Spark Streaming 程序:
$SPARK_HOME/bin/spark-submit \
--class com.example.ClickhouseLogsProcessor \
--master local[*] \
--jars $CLICKHOUSE_JDBC_JAR,$ELASTICSEARCH_PYTHON_VERSION.jar \
--files application.properties \
stream_clickhouse_logs.py
发送测试数据:
kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic clickhouse.info \
--property parse.key=false \
--property key.separator='|'
[2020-11-19 00:00:01][INFO] Starting new clickhouse instance
[2020-11-19 00:00:01][INFO] Instance started successfully
查看 Elasticsearch 索引中的数据:
curl -XGET http://localhost:9200/_search?pretty
{
"query": {
"match_all": {}
},
"sort": [
"_doc"
],
"size": 10
}
版权归原作者 禅与计算机程序设计艺术 所有, 如有侵权,请联系我们删除。