0


Flink面试题

一 基础篇

Flink的执行图有哪几种?分别有什么作用

Flink中的执行图一般是可以分为四类,按照生成顺序分别为:StreamGraph-> JobGraph-> ExecutionGraph->物理执行图。

1)StreamGraph

顾名思义,这里代表的是我们编写的流程序图。通过Stream API生成,这是执行图的最原始拓扑数据结构。

2)JobGraph

StreamGraph在Client中经过算子chain链合并等优化,转换为JobGraph拓扑图,随后被提交到JobManager中。

3)ExecutionGraph

JobManager中将JobGraph进一步转换为ExecutionGraph,此时ExecutuonGraph根据算子配置的并行度转变为并行化的Graph拓扑结构。

4)物理执行图

比较偏物理执行概念,即JobManager进行Job调度,TaskManager最终部署Task的图结构。


Flink的窗口机制

在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。

窗口可以是时间驱动的(Time Window,例如:每30秒钟),也可以是数据驱动的(Count Window,例如:每一百个元素)。一种经典的窗口分类可以分成:翻滚窗口(Tumbling Window,无重叠),滚动窗口(Sliding Window,有重叠),和会话窗口(Session Window,活动间隙)。

我们举个具体的场景来形象地理解不同窗口的概念。假设,淘宝网会记录每个用户每次购买的商品个数,我们要做的是统计不同窗口中用户购买商品的总数。下图给出了几种经典的窗口切分概述图:

上图中,raw data stream 代表用户的购买行为流,圈中的数字代表该用户本次购买的商品个数,事件是按时间分布的,所以可以看出事件之间是有time gap的。Flink 提供了上图中所有的窗口类型,下面我们会逐一进行介绍。

Time Window

就如名字所说的,Time Window 是根据时间对数据流进行分组的。这里我们涉及到了流处理中的时间问题,时间问题和消息乱序问题是紧密关联的,这是流处理中现存的难题之一,我们将在后续的 EventTime 和消息乱序处理 中对这部分问题进行深入探讨。这里我们只需要知道 Flink 提出了三种时间的概念,分别是event time(事件时间:事件发生时的时间),ingestion time(摄取时间:事件进入流处理系统的时间),processing time(处理时间:消息被计算处理的时间)。Flink 中窗口机制和时间类型是完全解耦的,也就是说当需要改变时间类型时不需要更改窗口逻辑相关的代码。

  • Tumbling Time Window如上图,我们需要统计每一分钟中用户购买的商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被成为翻滚时间窗口(Tumbling Time Window)。翻滚窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。通过使用 DataStream API,我们可以这样实现:
// Stream of (userId, buyCnt)val buyCnts: DataStream[(Int, Int)] = ...

val tumblingCnts: DataStream[(Int, Int)] = buyCnts
  // key stream by userId
  .keyBy(0) 
  // tumbling time window of 1 minute length
  .timeWindow(Time.minutes(1))
  // compute sum over buyCnt
  .sum(1) 
  • Sliding Time Window但是对于某些应用,它们需要的窗口是不间断的,需要平滑地进行窗口聚合。比如,我们可以每30秒计算一次最近一分钟用户购买的商品总数。这种窗口我们称为滑动时间窗口(Sliding Time Window)。在滑窗中,一个元素可以对应多个窗口。通过使用 DataStream API,我们可以这样实现:
val slidingCnts: DataStream[(Int, Int)] = buyCnts
  .keyBy(0) 
  // sliding time window of 1 minute length and 30 secs trigger interval
  .timeWindow(Time.minutes(1), Time.seconds(30))
  .sum(1)

Count Window

Count Window 是根据元素个数对数据流进行分组的。

  • Tumbling Count Window当我们想要每100个用户购买行为事件统计购买总数,那么每当窗口中填满100个元素了,就会对窗口进行计算,这种窗口我们称之为翻滚计数窗口(Tumbling Count Window),上图所示窗口大小为3个。通过使用 DataStream API,我们可以这样实现:
// Stream of (userId, buyCnts)val buyCnts: DataStream[(Int, Int)] = ...

val tumblingCnts: DataStream[(Int, Int)] = buyCnts
  // key stream by sensorId
  .keyBy(0)
  // tumbling count window of 100 elements size
  .countWindow(100)
  // compute the buyCnt sum 
  .sum(1)
  • Sliding Count Window当然Count Window 也支持 Sliding Window,虽在上图中未描述出来,但和Sliding Time Window含义是类似的,例如计算每10个元素计算一次最近100个元素的总和,代码示例如下。
val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
  .keyBy(0)
  // sliding count window of 100 elements size and 10 elements trigger interval
  .countWindow(100, 10)
  .sum(1)

Session Window

在这种用户交互事件流中,我们首先想到的是将事件聚合到会话窗口中(一段用户持续活跃的周期),由非活跃的间隙分隔开。如上图所示,就是需要计算每个用户在活跃期间总共购买的商品数量,如果用户30秒没有活动则视为会话断开(假设raw data stream是单个用户的购买行为流)。Session Window 的示例代码如下:

// Stream of (userId, buyCnts)val buyCnts: DataStream[(Int, Int)] = ...
    
val sessionCnts: DataStream[(Int, Int)] = vehicleCnts
    .keyBy(0)
    // session window based on a 30 seconds session gap interval 
    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(30)))
    .sum(1)

一般而言,window 是在无限的流上定义了一个有限的元素集合。这个集合可以是基于时间的,元素个数的,时间和个数结合的,会话间隙的,或者是自定义的。Flink 的 DataStream API 提供了简洁的算子来满足常用的窗口操作,同时提供了通用的窗口机制来允许用户自己定义窗口分配逻辑。下面我们会对 Flink 窗口相关的 API 进行剖析。

Flink中的时间概念

Flink在流处理程序支持不同的时间概念。分别为Event Time/Processing Time/Ingestion Time,也就是事件时间、处理时间、提取时间。

从时间序列角度来说,发生的先后顺序是:

事件时间(Event Time)----> 提取时间(Ingestion Time)----> 处理时间(Processing Time)

复制

Event Time 是事件在现实世界中发生的时间,它通常由事件中的时间戳描述。

Ingestion Time 是数据进入Apache Flink流处理系统的时间,也就是Flink读取数据源时间。

Processing Time 是数据流入到具体某个算子 (消息被计算处理) 时候相应的系统时间。也就是Flink程序处理该事件时当前系统时间。

但是我们讲解时,会从后往前讲解,把最重要的Event Time放在最后。

处理时间

是数据流入到具体某个算子时候相应的系统时间。

这个系统时间指的是执行相应操作的机器的系统时间。当一个流程序通过处理时间来运行时,所有基于时间的操作(如: 时间窗口)将使用各自操作所在的物理机的系统时间。

ProcessingTime 有最好的性能和最低的延迟。但在分布式计算环境或者异步环境中,ProcessingTime具有不确定性,相同数据流多次运行有可能产生不同的计算结果。因为它容易受到从记录到达系统的速度(例如从消息队列)到记录在系统内的operator之间流动的速度的影响(停电,调度或其他)。

提取时间

IngestionTime是数据进入Apache Flink框架的时间,是在Source Operator中设置的。每个记录将源的当前时间作为时间戳,并且后续基于时间的操作(如时间窗口)引用该时间戳。

提取时间在概念上位于事件时间和处理时间之间。与处理时间相比,它稍早一些。IngestionTime与ProcessingTime相比可以提供更可预测的结果,因为IngestionTime的时间戳比较稳定(在源处只记录一次),所以同一数据在流经不同窗口操作时将使用相同的时间戳,而对于ProcessingTime同一数据在流经不同窗口算子会有不同的处理时间戳。

与事件时间相比,提取时间程序无法处理任何无序事件或后期数据,但程序不必指定如何生成水位线。

在内部,提取时间与事件时间非常相似,但具有自动时间戳分配和自动水位线生成功能。

事件时间

事件时间就是事件在真实世界的发生时间,即每个事件在产生它的设备上发生的时间(当地时间)。比如一个点击事件的时间发生时间,是用户点击操作所在的手机或电脑的时间。

在进入Apache Flink框架之前EventTime通常要嵌入到记录中,并且EventTime也可以从记录中提取出来。在实际的网上购物订单等业务场景中,大多会使用EventTime来进行数据计算。

Flink的watermark

Watermark是Apache Flink为了处理EventTime 窗口计算提出的一种机制,本质上也是一种时间戳。watermark是用于处理乱序事件或延迟数据的,这通常用watermark机制结合window来实现(Watermarks用来触发window窗口计算)。

比如对于late element,我们不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。 可以把Watermark看作是一种告诉Flink一个消息延迟多少的方式。定义了什么时候不再等待更早的数据。

1. 窗口触发条件

上面谈到了对数据乱序问题的处理机制是watermark+window,那么window什么时候该被触发呢?

基于Event Time的事件处理,Flink默认的事件触发条件为:

对于out-of-order及正常的数据而言

watermark的时间戳 > = window endTime

在 [window_start_time,window_end_time] 中有数据存在。

对于late element太多的数据而言

Event Time > watermark的时间戳

WaterMark相当于一个EndLine,一旦Watermarks大于了某个window的end_time,就意味着windows_end_time时间和WaterMark时间相同的窗口开始计算执行了。

就是说,我们根据一定规则,计算出Watermarks,并且设置一些延迟,给迟到的数据一些机会,也就是说正常来讲,对于迟到的数据,我只等你一段时间,再不来就没有机会了。

WaterMark时间可以用Flink系统现实时间,也可以用处理数据所携带的Event time。

使用Flink系统现实时间,在并行和多线程中需要注意的问题较少,因为都是以现实时间为标准。

如果使用处理数据所携带的Event time作为WaterMark时间,需要注意两点:

因为数据到达并不是循序的,注意保存一个当前最大时间戳作为WaterMark时间

并行同步问题

2. WaterMark设定方法

标点水位线(Punctuated Watermark)

标点水位线(Punctuated Watermark)通过数据流中某些特殊标记事件来触发新水位线的生成。这种方式下窗口的触发与时间无关,而是决定于何时收到标记事件。

在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。

定期水位线(Periodic Watermark)

周期性的(允许一定时间间隔或者达到一定的记录条数)产生一个Watermark。水位线提升的时间间隔是由用户设置的,在两次水位线提升时隔内会有一部分消息流入,用户可以根据这部分数据来计算出新的水位线。

在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。

举个例子,最简单的水位线算法就是取目前为止最大的事件时间,然而这种方式比较暴力,对乱序事件的容忍程度比较低,容易出现大量迟到事件。

3. 迟到事件

虽说水位线表明着早于它的事件不应该再出现,但是上如上文所讲,接收到水位线以前的的消息是不可避免的,这就是所谓的迟到事件。实际上迟到事件是乱序事件的特例,和一般乱序事件不同的是它们的乱序程度超出了水位线的预计,导致窗口在它们到达之前已经关闭。

迟到事件出现时窗口已经关闭并产出了计算结果,因此处理的方法有3种:

重新激活已经关闭的窗口并重新计算以修正结果。

将迟到事件收集起来另外处理。

将迟到事件视为错误消息并丢弃。

Flink 默认的处理方式是第3种直接丢弃,其他两种方式分别使用Side Output和Allowed Lateness。

Side Output机制可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。

Allowed Lateness机制允许用户设置一个允许的最大迟到时长。Flink 会在窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。

这里总结机制为:

窗口window 的作用是为了周期性的获取数据。

watermark的作用是防止数据出现乱序(经常),事件时间内获取不到指定的全部数据,而做的一种保险方法。

allowLateNess是将窗口关闭时间再延迟一段时间。

sideOutPut是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流。

4.例子

假如我们设置10s的时间窗口(window),那么010s,1020s都是一个窗口,以0~10s为例,0为start-time,10为end-time。假如有4个数据的event-time分别是8(A),12.5(B),9(C),13.5(D),我们设置Watermarks为当前所有到达数据event-time的最大值减去延迟值3.5秒

当A到达的时候,Watermarks为max{8}-3.5=8-3.5 = 4.5 < 10,不会触发计算

当B到达的时候,Watermarks为max(12.5,8)-3.5=12.5-3.5 = 9 < 10,不会触发计算

当C到达的时候,Watermarks为max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不会触发计算

当D到达的时候,Watermarks为max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,触发计算

触发计算的时候,会将A,C(因为他们都小于10)都计算进去,其中C是迟到的。 max这个很关键,就是当前窗口内,所有事件的最大事件。 这里的延迟3.5s是我们假设一个数据到达的时候,比他早3.5s的数据肯定也都到达了,这个是需要根据经验推算。假设加入D到达以后有到达了一个E,event-time=6,但是由于0~10的时间窗口已经开始计算了,所以E就丢了。 从这里上面E的丢失说明,水位线也不是万能的,但是如果根据我们自己的生产经验+侧道输出等方案,可以做到数据不丢失。

Flink分布式快照原理是什么

可靠性是分布式系统实现必须考虑的因素之一。Flink基于Chandy-Lamport分布式快照算法实现了一套可靠的Checkpoint机制,可以保证集群中某些节点出现故障时,能够将整个作业恢复到故障之前某个状态。同时,Checkpoint机制也是Flink实现Exactly-Once语义的基础。

本文将介绍Flink的Checkpoint机制的原理,并从源码层面了解Checkpoint机制是如何实现的(基于Flink 1.10)。

1. 为什么需要Checkpoint

Flink是有状态的流计算处理引擎,每个算子Operator可能都需要记录自己的运行数据,并在接收到新流入的元素后不断更新自己的状态数据。当分布式系统引入状态计算后,为了保证计算结果的正确性(特别是对于流处理系统,不可能每次系统故障后都从头开始计算),就必然要求系统具有容错性。对于Flink来说,Flink作业运行在多个节点上,当出现节点宕机、网络故障等问题,需要一个机制保证节点保存在本地的状态不丢失。流处理中Exactly-Once语义的实现也要求作业从失败恢复后的状态要和失败前的状态一致。

那么怎么保证分布式环境下各节点状态的容错呢?通常这是通过定期对作业状态和数据流进行快照实现的,常见的检查点算法有比如Sync-and-Stop(SNS)算法、Chandy-Lamport(CL)算法。

Flink的Checkpoint机制是基于Chandy-Lamport算法的思想改进而来,引入了Checkpoint Barrier的概念,可以在不停止整个流处理系统的前提下,让每个节点独立建立检查点保存自身快照,并最终达到整个作业全局快照的状态。有了全局快照,当我们遇到故障或者重启的时候就可以直接从快照中恢复,这就是Flink容错的核心。

2. Checkpoint执行流程

Barrier是Flink分布式快照的核心概念之一,称之为屏障或者数据栅栏(可以理解为快照的分界线)。Barrier是一种特殊的内部消息,在进行Checkpoint的时候Flink会在数据流源头处周期性地注入Barrier,这些Barrier会作为数据流的一部分,一起流向下游节点并且不影响正常的数据流。Barrier的作用是将无界数据流从时间上切分成多个窗口,每个窗口对应一系列连续的快照中的一个,每个Barrier都带有一个快照ID,一个Barrier生成之后,在这之前的数据都进入此快照,在这之后的数据则进入下一个快照。

如上图,Barrier-n跟随着数据流一起流动,当算子从输入流接收到Barrier-n后,就会停止接收数据并对当前自身的状态做一次快照,快照完成后再将Barrier-n以广播的形式传给下游节点。一旦作业的Sink算子接收到Barrier n后,会向JobMnager发送一个消息,确认Barrier-n对应的快照完成。当作业中的所有Sink算子都确认后,意味一次全局快照也就完成。

当一个算子有多个上游节点时,会接收到多个Barrier,这时候需要进行Barrier Align对齐操作。

如上图,一个算子有两个输入流,当算子从一个上游数据流接收到一个Barrier-n后,它不会立即向下游广播,而是先暂停对该数据流的处理,将到达的数据先缓存在Input Buffer中(因为这些数据属于下一次快照而不是当前快照,缓存数据可以不阻塞该数据流),直到从另外一个数据流中接收到Barrier-n,才会进行快照处理并将Barrier-n向下游发送。从这个流程可以看出,如果开启Barrier对齐后,算子由于需要等待所有输入节点的Barrier到来出现暂停,对整体的性能也会有一定的影响。

综上,Flink Checkpoint机制的核心思想实质上是通过Barrier来标记触发快照的时间点和对应需要进行快照的数据集,将数据流处理和快照操作解耦开来,从而最大程度降低快照对系统性能的影响。

Flink的一致性和Checkpoint机制有紧密的关系:

当不开启Checkpoint时,节点发生故障时可能会导致数据丢失,这就是At-Most-Once

当开启Checkpoint但不进行Barrier对齐时,对于有多个输入流的节点如果发生故障,会导致有一部分数据可能会被处理多次,这就是At-Least-Once

当开启Checkpoint并进行Barrier对齐时,可以保证每条数据在故障恢复时只会被重放一次,这就是Exactly-Once

3. Checkpoint相关配置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

默认情况下,Checkpoint机制是关闭的,需要通过enableCheckpointing(interval)来开启,并指定每interval毫秒进行一次Checkpoint。

Checkpoint模式支持Exactly-Once和At-Least-Once,可以通过setCheckpointingMode来设置。

如果两次Checkpoint的时间很短,会导致整个系统大部分资源都用于执行Checkpoint,影响正常作业的执行。可以通过setMinPauseBetweenCheckpoints来设置两次Checkpoint之间的最小间隔。

setCheckpointTimeout可以给Checkpoint设置一个超时时间,当一次Checkpoint超过一定时间没有完成,直接终止掉。

默认情况下,当一个Checkpoint还在执行时,不会触发另一个Checkpoint,通过setMaxConcurrentCheckpoints可以设置最大并发Checkpoint数量。

enableExternalizedCheckpoints可以设置当用户取消了作业后,是否保留远程存储上的Checkpoint数据,一般设置为RETAIN_ON_CANCELLATION。

保存多个Checkpoint

默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint,而当Flink程序失败时,可以从最近的这个Checkpoint来进行恢复。但是,如果我们希望保留多个Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活,比如,我们发现最近4个小时数据记录处理有问题,希望将整个状态还原到4小时之前。

Flink可以支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数:

state.checkpoints.num-retained: 20

保留了最近的20个Checkpoint。如果希望会退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现。

从Checkpoint进行恢复

从指定的checkpoint处启动,最近的一个/flink/checkpoints/workFlowCheckpoint/339439e2a3d89ead4d71ae3816615281/chk-1740584启动,通常需要先停掉当前运行的flink-session,然后通过命令启动:

../bin/flink run -p 10 -s /flink/checkpoints/workFlowCheckpoint/339439e2a3d89ead4d71ae3816615281/chk-1740584/_metadata -c com.code2144.helper_wink-1.0-SNAPSHOT.jar

可以把命令放到脚本里面,每次直接执行checkpoint恢复脚本即可:

保存点机制 (Savepoints)

保存点机制 (Savepoints)是检查点机制的一种特殊的实现,它允许通过手工的方式来触发 Checkpoint,并将结果持久化存储到指定路径中,主要用于避免 Flink 集群在重启或升级时导致状态丢失。示例如下:

触发指定id的作业的Savepoint,并将结果存储到指定目录下

bin/flink savepoint :jobId [:targetDirectory]

手动savepoint

/app/local/flink-1.6.2/bin/flink savepoint 0409251eaff826ef2dd775b6a2d5e219  [hdfs://bigdata/path]

成功触发savepoint通常会提示:Savepoint completed. Path: hdfs://path...:

手动取消任务

与checkpoint异常停止或者手动Kill掉不一样,对于savepoint通常是我们想要手动停止任务,然后更新代码,可以使用flink cancel ...命令:

/app/local/flink-1.6.2/bin/flink cancel 0409251eaff826ef2dd775b6a2d5e219

从指定savepoint启动job

bin/flink run -p 8 -s hdfs:///flink/savepoints/savepoint-567452-9e3587e55980 -c com.code2144.helper_workflow.HelperWorkFlowStreaming jars/BSS-ONSS-Flink-1.0-SNAPSHOT.jar

Flink的内存管理是如何做的

在介绍内存管理之前,先介绍一下JVM中的堆内存和堆外内存。

通常来说。JVM堆空间概念,简单描述就是在程序中,关于对象实例|数组的创建、使用和释放的内存,都会在JVM中的一块被称作为"JVM堆"内存区域内进行管理分配。

Flink程序在创建对象后,JVM会在堆内内存中分配一定大小的空间,创建Class对象并返回对象引用,Flink保存对象引用,同时记录占用的内存信息。

而堆外内存如果你有过Java相关编程经历的话,相信对堆外内存的使用并不陌生。其底层调用基于C的JDK Unsafe类方法,通过指针直接进行内存的操作,包括内存空间的申请、使用、删除释放等。

介绍完了堆内内存和堆外内存的概念,下面我们来看下Flink的内存管理。

1)JobManager内存管理

JobManager进程总内存包括JVM堆内内存、JVM堆外内存以及JVM MetaData内存,其中涉及的内存配置参数为:

# JobManager总进程内存
jobmanager.memory.process.size:

# 作业管理器的 JVM 堆内存大小
jobmanager.memory.heap.size:

#作业管理器的堆外内存大小。此选项涵盖所有堆外内存使用。
jobmanager.memory.off-heap.size:
复制代码

2)TaskManager内存管理

TaskManager内存同样包含JVM堆内内存、JVM堆外内存以及JVM MetaData内存三大块。其中JVM堆内内存又包含Framework Heap和Task Heap,即框架堆内存和任务Task堆内存。

JVM堆外内存包含Memory memory托管内存,主要用于保存排序、结果缓存、状态后端数据等。另一块为Direct Memory直接内存,包含如下:

  • Framework Off-Heap Memory:Flink框架的堆外内存,即Flink中TaskManager的自身内存,和slot无关。

  • Task Off-Heap:Task的堆外内存

  • Network Memory:网络内存

其中涉及的内存配置参数为:

// tm的框架堆内内存
taskmanager.memory.framework.heap.size=

// tm的任务堆内内存
taskmanager.memory.task.heap.size

// Flink管理的原生托管内存
taskmanager.memory.managed.size=
taskmanager.memory.managed.fraction=

// Flink 框架堆外内存
taskmanager.memory.framework.off-heap.size=

// Task 堆外内存
taskmanager.memory.task.off-heap.size=

// 网络数据交换所使用的堆外内存大小
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 1gb
taskmanager.memory.network.fraction: 0.1复制代码

Flink/Spark/Hive SQL的执行原理

这里我把三个组件SQL执行原理放到了一起,通过对比加深一下印象。

1)Hive SQL的执行原理

Hive SQL是Hive提供的SQL查询引擎,底层由MapReduce实现。Hive根据输入的SQL语句执行词法分析、语法树构建、编译、逻辑计划、优化逻辑计划以及物理计划等过程,转化为Map Task和Reduce Task最终交由Mapreduce引擎执行。

  • 执行引擎。具有mapreduce的一切特性,适合大批量数据离线处理,相较于Spark而言,速度较慢且IO操作频繁

  • 有完整的hql语法,支持基本sql语法、函数和udf

  • 对表数据存储格式有要求,不同存储、压缩格式性能不同

2)Spark SQL的执行原理

Spark SQL底层基于Spark引擎,使用Antlr解析语法,编译生成逻辑计划和物理计划,过程和Hive SQL执行过程类似,只不过Spark SQL产生的物理计划为Spark程序。

  • 输入编写的Spark SQL

  • SqlParser分析器。进行语法检查、词义分析,生成未绑定的Logical Plan逻辑计划(未绑定查询数据的元数据信息,比如查询什么文件,查询那些列等)

  • Analyzer解析器。查询元数据信息并绑定,生成完整的逻辑计划。此时可以知道具体的数据位置和对象,Logical Plan 形如from table -> filter column -> select 形式的树结构

  • Optimizer优化器。选择最好的一个Logical Plan,并优化其中的不合理的地方。常见的例如谓词下推、剪枝、合并等优化操作

  • Planner使用Planing Strategies将逻辑计划转化为物理计划,并根据最佳策略选择出的物理计划作为最终的执行计划

  • 调用Spark Plan Execution执行引擎执行Spark RDD任务

3)Flink SQL的执行原理

一条SQL从提交到Calcite解析,优化,到最后的Flink执行,一般分以下过程:

  1. Sql Parser: 将sql语句通过java cc解析成AST(语法树),在calcite中用SqlNode表示AST;

  2. Sql Validator: 结合数字字典(catalog)去验证sql语法;

  3. 生成Logical Plan: 将sqlNode表示的AST转换成LogicalPlan, 用relNode表示;

  4. 生成 optimized LogicalPlan: 先基于calcite rules 去优化logical Plan,基于flink定制的一些优化rules去优化logical Plan;

  5. 生成Flink PhysicalPlan: 这里也是基于flink里头的rules将,将optimized LogicalPlan转成成Flink的物理执行计划;

  6. 将物理执行计划转成Flink ExecutionPlan: 就是调用相应的tanslateToPlan方法转换和利用CodeGen元编程成Flink的各种算子。

Table API 来提交任务的话,基本流程和运行SQL类似,稍微不同的是:table api parser: flink会把table api表达的计算逻辑也表示成一颗树,用treeNode去表式; 在这棵树上的每个节点的计算逻辑用Expression来表示。

简单说一下SQL优化:RBO(基于规则)

RBO主要是开发人员在使用SQL的过程中,有些发现有些通用的规则,可以显著提高SQL执行的效率,比如最经典的filter下推:

将Filter下推到Join之前执行,这样做的好处是减少了Join的数量,同时降低了CPU,内存,网络等方面的开销,提高效率。

SQL优化的发展,则可以分为两个阶段,即RBO(基于规则),和CBO(基于代价)

RBO和CBO的区别大概在于: RBO只为应用提供的rule,而CBO会根据给出的Cost信息,智能应用rule,求出一个Cost最低的执行计划。需要纠正很多人误区的一点是,CBO其实也是基于rule的,接触到RBO和CBO这两个概念的时候,很容易将他们对立起来。但实际上CBO,可以理解为就是加上Cost的RBO。

Flink SQL 引擎的工作流总结如图所示。

从图中可以看出,一段查询 SQL / 使用TableAPI 编写的程序(以下简称 TableAPI 代码)从输入到编译为可执行的 JobGraph 主要经历如下几个阶段

  1. 将 SQL文本 / TableAPI 代码转化为逻辑执行计划(Logical Plan)

  2. Logical Plan 通过优化器优化为物理执行计划(Physical Plan)

  3. 通过代码生成技术生成 Transformations 后进一步编译为可执行的 JobGraph 提交运行

Flink的背压,怎么解决

Flink背压是生产应用中常见的情况,当程序存在数据倾斜、内存不足状况经常会发生背压,我将从如下几个方面去分析。

1)Flink背压表现

  • 1)运行开始时正常,后面出现大量Task任务等待

  • 2)少量Task任务开始报checkpoint超时问题

  • 3)大量Kafka数据堆积,无法消费

  • 4)Flink UI的BackPressure页面出现红色High标识

2) 反压一般有哪些情况

一般可以细分两种情况:

  • 当前Task任务处理速度慢,比如task任务中调用算法处理等复杂逻辑,导致上游申请不到足够内存。

  • 下游Task任务处理速度慢,比如多次collect()输出到下游,导致当前节点无法申请足够的内存。

3) 频繁反压的影响是什么

频繁反压会导致流处理作业数据延迟增加,同时还会影响到Checkpoint。

Checkpoint时需要进行Barrier对齐,此时若某个Task出现反压,Barrier流动速度会下降,导致Checkpoint变慢甚至超时,任务整体也变慢。

长期或频繁出现反压才需要处理,如果由于网络波动或者GC出现的偶尔反压可以不必处理。

4)Flink的反压机制

背压时一般下游速度慢于上游速度,数据久积成疾,需要做限流。但是无法提前预估下游实际速度,且存在网络波动情况。

需要保持上下游动态反馈,如果下游速度慢,则上游限速;否则上游提速。实现动态自动反压的效果。

下面看下Flink内部是怎么实现反压机制的。

  • 1)每个TaskManager维护共享Network BufferPool(Task共享内存池),初始化时向Off-heap Memory中申请内存。

  • 2)每个Task创建自身的Local BufferPool(Task本地内存池),并和Network BufferPool交换内存。

  • 3)上游Record Writer向 Local BufferPool申请buffer(内存)写数据。如果Local BufferPool没有足够内存则向Network BufferPool申请,使用完之后将申请的内存返回Pool。

  • 4)Netty Buffer拷贝buffer并经过Socket Buffer发送到网络,后续下游端按照相似机制处理。

  • 5)当下游申请buffer失败时,表示当前节点内存不够,则逐层发送反压信号给上游,上游慢慢停止数据发送,直到下游再次恢复。

5)反压如何处理

  • 查看Flink UI界面,定位哪些Task出现反压问题

  • 查看代码和数据,检查是否出现数据倾斜

  • 如果发生数据倾斜,进行预聚合key或拆分数据

  • 加大执行内存,调整并发度和分区数

  • 其他方式。。。

由于篇幅有限,更多Flink反压内容请查看我的相关文章:万字趣解Flink背压

Flink的exactly-once怎么保障

精准一次消费需要整个系统各环节均保持强一致性,包括可靠的数据源端(数据可重复读取、不丢失) 、可靠的消费端(Flink)、可靠的输出端(幂等性、事务)。

Flink保持精准一次消费主要依靠checkpoint一致性快照和二阶段提交机制。

1)数据源端

Flink内置FlinkKafkaConsumer类,不依赖于 kafka 内置的消费组offset管理,在内部自行记录并维护 kafka consumer 的offset。

(1)管理offset(手动提交)并保存到checkpoint中

(2)FlinkKafkaConsumer API内部集成Flink的Checkpoint机制,自动实现精确一次的处理语义。

从源码中看到stateBackend中把offset state恢复到restoredState,然后从fetcher拉取最新的offset数据,随后将offset存入到stateBackend中;最后更新xcheckpoint。

2)Flink消费端

Flink内部采用一致性快照机制来保障Exactly-Once的一致性语义。

通过间隔时间自动执行一致性检查点(Checkpoints)程序,b并异步插入barrier检查点分界线。整个流程所有的operator均会进行barrier对齐->数据完成确认->checkpoints状态保存,从而保证数据被精确一次处理。

3)输出端

Flink内置二阶段事务提交机制和目标源支持幂等写入。

幂等写入就是多次写入会产生相同的结果,结果具有不可变性。在Flink中saveAsTextFile算子就是一种比较典型的幂等写入。

二阶段提交则对于每个checkpoint创建事务,先预提交数据到sink中,然后等所有的checkpoint全部完成后再真正提交请求到sink, 并把状态改为已确认,从而保证数据仅被处理一次。

为checkpoint创建事务,等到所有的checkpoint全部真正的完成后,才把计算结果写入到sink中。

Flink怎么处理迟到数据

  1. Flink内置watermark机制,可在一定程度上允许数据延迟

  2. 程序可在watermark的基础上再配置最大延迟时间

  3. 开启侧输出流,将延迟的数据输出到侧输出流

  4. 程序内部控制,延迟过高的数据单独进行后续处理

Flink的双流JOIN

Flink双流JOIN主要分为两大类。一类是基于原生State的Connect算子操作,另一类是基于窗口的JOIN操作。其中基于窗口的JOIN可细分为window join和interval join两种。

实现原理:底层原理依赖Flink的State状态存储,通过将数据存储到State中进行关联join, 最终输出结果。

1)基于Window Join的双流JOIN实现机制

通俗理解,将两条实时流中元素分配到同一个时间窗口中完成Join。两条实时流数据缓存在Window State中,当窗口触发计算时执行join操作。

  • join算子操作

两条流数据按照关联主键在(滚动、滑动、会话)窗口内进行inner join, 底层基于State存储,并支持处理时间和事件时间两种时间特征,看下源码:

windows窗口、state存储和双层for循环执行join()实现双流JOIN操作,但是此时仅支持inner join类型。

  • coGroup算子操作

coGroup算子也是基于window窗口机制,不过coGroup算子比Join算子更加灵活,可以按照用户指定的逻辑匹配左流或右流数据并输出,达到left join和right join的目的。

orderDetailStream
  .coGroup(orderStream)
  .where(r -> r.getOrderId())
  .equalTo(r -> r.getOrderId())
  .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
  .apply(new CoGroupFunction<OrderDetail, Order, Tuple2<String, Long>>() {
    @Override
    public void coGroup(Iterable<OrderDetail> orderDetailRecords, Iterable<Order> orderRecords, Collector<Tuple2<String, Long>> collector)  {
      for (OrderDetail orderDetaill : orderDetailRecords) {
        boolean flag = false;
        for (Order orderRecord : orderRecords) {
          // 右流中有对应的记录
          collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), orderDetailRecords.getGoods_price()));
          flag = true;
        }
        if (!flag) {
          // 右流中没有对应的记录
          collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), null));
        }
      }
    }
  })
  .print();
复制代码

2)基于Interval Join的双流JOIN实现机制

Interval Join根据右流相对左流偏移的时间区间(interval)作为关联窗口,在偏移区间窗口中完成join操作。

满足数据流stream2在数据流stream1的 interval(low, high)偏移区间内关联join。interval越大,关联上的数据就越多,超出interval的数据不再关联。

实现原理:interval join也是利用Flink的state存储数据,不过此时存在state失效机制ttl,触发数据清理操作。

val env = ...
// kafka 订单流val orderStream = ... 
// kafka 订单明细流val orderDetailStream = ...
    
orderStream.keyBy(_.1)
    // 调用intervalJoin关联
    .intervalJoin(orderDetailStream._2)
    // 设定时间上限和下限
    .between(Time.milliseconds(-30), Time.milliseconds(30))  
    .process(new ProcessWindowFunction())
    
class ProcessWindowFunction extends ProcessJoinFunction...{
   override def processElement(...) {
      collector.collect((r1, r2) => r1 + " : " + r2)
   }
}
复制代码

订单流在流入程序后,等候(low,high)时间间隔内的订单明细流数据进行join, 否则继续处理下一个流。interval join目前也仅支持inner join。

3)基于Connect的双流JOIN实现机制

对两个DataStream执行connect操作,将其转化为ConnectedStreams, 生成的Streams可以调用不同方法在两个实时流上执行,且双流之间可以共享状态。

两个数据流被connect之后,只是被放在了同一个流中,内部依然保持各自的数据和形式,两个流相互独立。

[DataStream1, DataStream2] -> ConnectedStreams[1,2]

我们可以在Connect算子底层的ConnectedStreams中编写代码,自行实现双流JOIN的逻辑处理。

  • 1)调用connect算子,根据orderid进行分组,并使用process算子分别对两条流进行处理。
orderStream.connect(orderDetailStream)
  .keyBy("orderId", "orderId")
  .process(new orderProcessFunc());
复制代码
  • 2)process方法内部进行状态编程, 初始化订单、订单明细和定时器的ValueState状态。
private ValueState<OrderEvent> orderState;
private ValueState<TxEvent> orderDetailState;
private ValueState<Long> timeState;

// 初始化状态Value
orderState = getRuntimeContext().getState(
 new ValueStateDescriptor<Order>
 ("order-state",Order.class));
····
复制代码
  • 3)为每个进入的数据流保存state状态并创建定时器。在时间窗口内另一个流达到时进行join并输出,完成后删除定时器。
@Override
public void processElement1(Order value, Context ctx, Collector<Tuple2<Order, OrderDetail>> out){
  if (orderDetailState.value() == null){
    //明细数据未到,先把订单数据放入状态
     orderState.update(value);
    //建立定时器,60秒后触发
     Long ts = (value.getEventTime()+10)*1000L;
     ctx.timerService().registerEventTimeTimer(
       ts);
     timeState.update(ts);
  }else{
    //明细数据已到,直接输出到主流
     out.collect(new Tuple2<>(value,orderDetailS
       tate.value()));
    //删除定时器
     ctx.timerService().deleteEventTimeTimer
      (timeState.value());
     //清空状态,注意清空的是支付状态
      orderDetailState.clear();
      timeState.clear();
  }
}
...
@Override
public void processElement2(){
  ...
}
复制代码
  • 4)未及时达到的数据流触发定时器输出到侧输出流,左流先到而右流未到,则输出左流,反之输出右连流。
@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<Order, OrderDetail>> out) {
  // 实现左连接
   if (orderState.value() != null){
       ctx.output(new OutputTag<String>("left-jo 
       in") {}, 
       orderState.value().getTxId());
   // 实现右连接
   }else{
      ctx.output(new OutputTag<String>("left-jo 
       in") {}, 
       orderDetailState.value().getTxId());
   }
   orderState.clear();
   orderDetailState.clear();
   timeState.clear();
}
复制代码

4)Flink双流JOIN问题处理总结

  • 1)为什么我的双流join时间到了却不触发,一直没有输出

检查一下watermark的设置是否合理,数据时间是否远远大于watermark和窗口时间,导致窗口数据经常为空

  • 2)state数据保存多久,会内存爆炸吗

state自带有ttl机制,可以设置ttl过期策略,触发Flink清理过期state数据。建议程序中的state数据结构用完后手动clear掉。

  • 3)我的双流join倾斜怎么办

join倾斜三板斧: 过滤异常key、拆分表减少数据、打散key分布。当然可以的话我建议加内存!加内存!加内存!!

  • 4)想实现多流join怎么办

目前无法一次实现,可以考虑先union然后再二次处理;或者先进行connnect操作再进行join操作,仅建议~

  • 5)join过程延迟、没关联上的数据会丢失吗

这个一般来说不会,join过程可以使用侧输出流存储延迟流;如果出现节点网络等异常,Flink checkpoint也可以保证数据不丢失。

Flink数据倾斜怎么处理

数据倾斜一般都是数据Key分配不均,比如某一类型key数量过多,导致shuffle过程分到某节点数据量过大,内存无法支撑。

1)数据倾斜可能的情况

那我们怎么发现数据倾斜了呢?一般是监控某任务Job执行情况,可以去Yarn UI或者Flink UI观察,一般会出现如下状况:

  • 发现某subTask执行时间过慢

  • 传输数据量和其他task相差过大

  • BackPressure页面出现反压问题(红色High标识)

结合以上的排查定位到具体的task中执行的算子,一般常见于Keyed类型算子:比如groupBy()、rebance()等产生shuffle过程的操作。

2)数据倾斜的处理方法

  • 数据拆分。如果能定位的数据倾斜的key,总结其规律特征。比如发现包含某字符,则可以在代码中把该部分数据key拆分出来,单独处理后拼接。

  • key二次聚合。两次聚合,第一次将key加前缀聚合,分散单点压力;随后去除前缀后再次聚合,得到最终结果。

  • 调整参数。加大TaskManager内存、keyby均衡等参数,一般效果不是很好。

  • 自定义分区或聚合逻辑。继承分区划分、聚合计算接口,根据数据特征和自定义逻辑,调整数据分区并均匀打散数据key。

Flink数据重复怎么办

一般来说Flink可以开启exactly-once机制,可保证精准一次消费。但是如果存在数据处理过程异常导致数据重复,可以借助一些工具或者程序来处理。

建议数据量不大的话可以使用flink自身的state或者借助bitmap结构;稍微大点可以用布隆过滤器或hyperlog工具;其次使用外部介质(redis或hbase)设计好key就行自动去重,只不过会增加处理过程。

总结一下Flink的去重方式:

  • 内存去重。采用Hashset等数据结构,读取数据中类似主键等唯一性标识字段,在内存中存储并进行去重判断。

  • 使用Redis Key去重。借助Redis的Hset等特殊数据类型,自动完成Key去重。

  • DataFrame/SQL场景,使用group by、over()、window开窗等SQL函数去重

  • 利用groupByKey等聚合算子去重

Flink实时数仓架构,为什么这么设计

实时数仓数据规整为层级存储,每层独立加工。整体遵循由下向上建设思想,最大化数据赋能。

1)数仓分层设计

  • 数据源: 分为日志数据和业务数据两大类,包括结构化和非结构化数据。

  • 数仓类型:根据及时性分为离线数仓和实时数仓

  • 技术栈:

  • 采集(Sqoop、Flume、CDC)

  • 存储(Hive、Hbase、Mysql、Kafka、数据湖)

  • 加工(Hive、Spark、Flink)

  • OLAP查询(Kylin、Clickhous、ES、Dorisdb)等。

2)数仓架构设计

整体采用Lambda架构。保留实时、离线两条处理流程,即最终会同时构建实时数仓和离线数仓。

  1. 技术实现
  • 使用Flink和Kafka、Hive为主要技术栈

  • 实时技术流程。通过实时采集程序同步数据到Kafka消息队列

  • Flink实时读取Kafka数据,回写到kafka ods贴源层topic

  • Flink实时读取Kafka的ods层数据,进行实时清洗和加工,结果写入到kafka dwd明细层topic

  • 同样的步骤,Flink读取dwd层数据写入到kafka dws汇总层topic

  • 离线技术流程和前面章节一致

  • 实时olap引擎查询分析、报表展示

  1. 优缺点
  • 两套技术流程,全面保障实时性和历史数据完整性

  • 同时维护两套技术架构,维护成本高,技术难度大

  • 相同数据源处理两次且存储两次,产生大量数据冗余和操作重复

  • 容易产生数据不一致问题

3)数据流程设计

整体从上而下,数据经过采集 -> 数仓明细加工、汇总 -> 应用步骤,提供实时数仓服务。

标签: flink 大数据

本文转载自: https://blog.csdn.net/tianhouquan/article/details/128921843
版权归原作者 Yaphets丶混世大魔王 所有, 如有侵权,请联系我们删除。

“Flink面试题”的评论:

还没有评论