本文总计 2200 字,预计阅读需要 5-7分钟
一般情况下我们都会使用Spark自带的webUI来进行集群的和提交任务的运行状况的监控,虽然我们优化Spark任务主要就是靠这些信息,但是它们只提供了基础架构概述,而不是与我们运行任务中数据相关的实际指标。例如当CPU使用率增加或集群耗尽内存时应用程序出现了问题、数据源更改模式或来自其他部门的数据损坏时,只能看到任务失败了而且没有任何的其他信息。这时候我们如果进行排查的话就要花费很多的时间,甚至有时连问题的定位都很困难。
我们可以使用Listener APIs 或者 Query Execution Listeners来获取我们的应用程序日志,还有一些已经封装好的现成的包来帮我们解决这个问题。
Low-Level
Spark Listener
这是一种非常古老且可靠的获取参数的方法(这其实就是Spark WebUI使用机制)。Listener API可以跟踪我们在应用执行时发送的事件消息。这些事件包括应用程序开始/结束、作业开始/结束、阶段开始/结束等(在Spark JavaDoc中有完整的列表)。配置和使用Spark Listeners获取指标是非常简单的因为在执行每个操作之后Spark都会自动的调用Spark Listener并将一些元数据信息传递给它。内容包括诸如执行时间、读取/写入的记录、读取/写入的字节和一些其他内容。
这种非常基础和低级的数据质量监控可以帮助我们检查记录一些基本信息。例如每天运行的应用我们可以编写一个listener来检查从输入读取了多少条记录并将其与前一天的结果进行比较。当差异很大时我们可以假设数据源可能有问题,发送并记录相应的信息。
但是这种方法需要我们自己编写监控的完整解决方案。例如:度量值存在哪里、怎么获取对比、配置警报机制等等。并且当应用程序代码发生变化时,所有的指标也可能发生变化这部分也要手动处理。
以下是Spark Listener的一个例子:
publicclassSomeSparkListenerextendsSparkListener {
@OverridepublicvoidonStageCompleted(SparkListenerStageCompleted stageCompleted) { StageInfo stageInfo = stageCompleted.stageInfo(); Iterator<AccumulatorV2<?, ?>> it = stageInfo.taskMetrics().accumulators().iterator();while (it.hasNext()) { AccumulatorV2<?, ?> next = it.next(); String key = next.name().get(); Object value = next.value(); System.out.printf("key: %s, value: %s%n", key, value); } }}
我们可以通过多种方式将 Spark Listener添加到应用中:
SparkSession spark = SparkSession.builder().getOrCreate();spark.sparkContext().addSparkListener(new SomeSparkListener());
也可以在提交的时候配置参数,这样就将这部分与代码逻辑分离了
spark-submit --conf "spark.extraListeners=yikai.demo.SomeSparkListener"
**Spark Query Execution Listener**
这是另一种 Spark 监控机制, 我们可以通过Query Execution Listener 订阅查询完成的事件,而不是关注指标,例如:它提供了逻辑和物理计划以及执行指标等更高级的元数据。通过Query Execution Listener可以查询读取/写入的记录等指标,但是所获得信息是针对整个查询而不是特定任务/作业/阶段的。
如果需要查询特定的指标就需要从数据位置和schema中提取信息。例如提取特定的dataframe的信息然后进与之前存储信息的对比,所以从这种执行计划中提取数据必须要使用low-level Spark API.这对于一般人来说可能会略有复杂。
这里有一个简单的样例:打印计划和指标的示例:
publicclassExampleQueryExecutionListenerimplementsQueryExecutionListener{
@OverridepublicvoidonSuccess(String funcName, QueryExecution qe, long durationNs){ System.out.println(qe.executedPlan().prettyJson()); Iterator<Tuple2<String, SQLMetric>> it = qe.executedPlan().metrics().iterator();while (it.hasNext()) { Tuple2<String, SQLMetric> next = it.next(); System.out.printf("Key: %s, value: %s%n", next._1(), next._2().value()); } }
@OverridepublicvoidonFailure(String funcName, QueryExecution qe, Exception exception){
}}
Query Execution Listener也可以使用多种方式添加
SparkSession spark = SparkSession.builder().getOrCreate();spark.listenerManager().register(new ExampleQueryExecutionListener());
spark-submit提交时:
spark-submit --conf "spark.extraListeners=yikai.demo.ExampleQueryExecutionListener"
上面介绍的都是Spark内置API,这种方式对于开发和维护的工作量会很大,但是这种监视方式有一个好处就是它不引入计算开销并且对业务代码的无入侵。任务执行信息是都是由Spark内部构件发出和记录的,因此不会对查询执行时间造成任何影响。
High-level
手动进行数据质量检查
通过手动验证传入数据是最简单和直接的方法。假设我们期望在输入数据源中有一些记录,这个数字通常不应该低于x。我们可以直接写一些非常简单的代码,比如:
df = spark.read("path")if (df.count < X) {thrownew RuntimeException("Input data is missing")}
这种方式非常简单和直接。也是我们最常用的一种方法
使用现成的程序包进行数据质量检查
由于许多质量检查都是可以进行抽象和复用的,因此开源的社区会提供一些现成的包供我们使用,其中就是Deequ。它为大多数情况提供了丰富的DSL,我们可以称之为“数据的单元测试”。除此以外他还提供分析列的能力。比如,计算最小/最大/平均值/百分位数,计算直方图,检测异常等功能。
下面是一个简单的使用样例:
val verificationResult = VerificationSuite() .onData(data) .addCheck( Check(CheckLevel.Error, "unit testing my data yikai demo") .hasSize(_ == 5) // 期望返回5行数据 .isComplete("id") // 不为空 .isUnique("id") // 唯一 .isComplete("productName") .isContainedIn("priority", Array("high", "low")) //只包含high、low2个值 .isNonNegative("numViews") // 不包含复数 .run()
Deequ通过这种方式可以让我们方便的进行数据的检查并且提供了存储检查结果并自动运行与以前结果的比较的功能。可以通过这种方式编写自己的实现并将Deequ无缝集成到现有的监控系统中。
虽然High-level 质量检查比Low-Level方法灵活得多但它们也有一个很大的缺点:性能损失。由于每次计算都会产生额外的spark操作,因此在某些情况下开销会非常大,特别是在大型数据集上,每一个“count”和“where”都可以导致完全扫描。虽然Spark内部会尽力优化执行计划,但还是应该考虑这些影响并确保这些操作不会占用更多的资源而影响业务。
总结
本篇文章介绍了Spark应用程序监控数据质量的几种方法。Spark Event Listeners API可以提供对低级指标的访问,如记录读/写、逻辑/物理计划等,对于构建和确保数据流产生正确的结果以及在不修改任何代码的情况下对现有应用程序进行监控是很有用。
手工检查数据或使用Deequ包等高级方法要方便得多,但会占用更多的集群资源。
这两种方法我们都在使用,并且根据不同的场景也都进行了优化,这两种该方案没有好坏之分,使用那一个还是要看应用程序类型。