1. 任务启动报错Trying to access closed classloader.
Exception in thread "Thread-5"java.lang.IllegalStateException:Tryingtoaccess closed classloader. Please check if you store
classloaders directly or indirectly in staticfields.
If the stacktrace suggests that the leak occurs in a third party
library and cannot be fixed immediately, you can disable this check
withthe configuration 'classloader.check-leaked-classloader'.
此错误虽不影响任务的正常启动,但可以通过在
flink-conf.yaml
文件中添加
classloader.check-leaked-classloader: false
选项后,后续提交任务不会再提示。
2. 资源(memory和vcores)充足,但提交任务阻塞后报错
此错误表象为yarn集群资源充足,在提交任务时也提示“ACCEPT“,但会阻塞一段时间后报错。
问题:可能是AM资源超出了限制
在Yarn集群界面上"Scheduler"里可以看到,
Max Application Master Resources
和
Used Application Master Resources
两个指标中,已使用的内存已经超过最大限制。
解决方法:在
capacity-scheduler.xml
中修改
yarn.scheduler.capacity.maximun-am-resource-percent
选项,默认是0.2,可以调大一点。
为什么不看vcores,只看内存限制呢。
3. Yarn集群有多个节点,但任务只集中分布在其中几个节点
通过指令
yarn node -all -list
拉取集群当前节点状态,发现有一些节点containers的数量很大,但有一些节点依然是0。虽然那些节点状态都是RUNNING(健康)状态,但就是不接收任务。
通过排查发现,这些节点不是不接收任务,而是在执行任务时报错,导致yarn集群会重新把任务分配给其他正常节点,最终形成只有部分节点有任务的现象。这些不正常节点在接收任务时报错如下(可以在yarn界面查看,点开具体的applicationID,中间有个Diagnostics):
去到正常和异常的节点下比较,确实异常节点缺失这个文件夹(用于存储运行时nodemanager和taskmanager的日志),怀疑是部署中间件框架时遗漏,通过手动增加文件夹的方式,问题解决。
4. Flink任务失败后,log找不到
flink任务执行过程中,日志可以通过flink的ui界面上可以查看(从yarn的application master跳转)。但任务一旦因为未知错误死亡时,在flink history service里并不能查到所有的日志,有时上面写的exception并不是root cause。
这时可以配置yarn集群的日志聚集(log aggregation)功能,在
yarn-site.xml
文件里配置
yran.log-aggregation-enable=true
功能打开后flink任务结束(不管是否正常结束)时,任务执行的日志会被统一收集。可以在yarn界面查看,点开具体的applicationID,在最下面有个
logs
可以进行查看。但是这里只能查看到
jobManager
相关的日志,
taskManager
需要自己手动拼接地址。例如
jobManager
日志地址为:
http://qyfwzx-dev-esbmon4:19888/jobhistory/logs/qyfwzx-dev-esbmon4:45454/container_e18_1634207619484_0505_01_000001/container_e18_1634207619484_0505_01_000001/flink
其中
http://qyfwzx-dev-esbmon4:19888/jobhistory/logs
为jobhistoryserver
的地址,不用变。qyfwzx-dev-esbmon4:45454
是jobManager
当时运行时的宿主yarn node地址container_e18_1634207619484_0505_01_000001
则是yarn当时运行时容器id/flink
则是提交任务时使用的用户名
所以我们只需要知道
taskmanager
的容器id和节点地址就能找到它的日志。这里在
jobManager.log
里搜索关键字“
Registering TaskManager
”,可以找到当时任务执行时
taskManger
的信息。
containerId
后面跟的就是当时运行
container
的节点地址。
按照连接的拼装方式,可以得到
taskManger
日志的地址:
http://qyfwzx-dev-esbmon4:19888/jobhistory/logs/qyfwzx-dev-esbmon1:45454/container_e18_1719387982584_0082_01_000002/container_e18_1719387982584_0082_01_000002/flink
5. Flink任务执行过程中,checkpoint太大导致失败
问题:Flink任务执行一段时间,会自动Restart,重启几次后任务失败。查看log里有以下错误提示。
Size of the state is larger than the maximum permitted memory-backed
state. Size=5244975, maxSize=5242880.Consider using a different
state backend, like the FileSystemState backend.
这个是checkpoint写入文件过大导致的,可以通过设置
flink-conf.yaml
文件中的选项:
state.backend.incremental:true
,此选项可以通过比对只保留增量变化的checkpoint内容,开启后,checkpoint的size大大缩小。state.backend: filesystem
,此选项是将checkpoint写入文件系统,值默认是HashMapStateBackend
,即以java对象的形式放入内存state.checkpoints.dir
,还可以指定外部hdfs地址作为存储
6. Flink任务并行度该如何设置
并行度的设置需要通过压测来决定,测试时主要观察单并行度的处理上限。即先从源头(比如kafka)积压数据,之后开启 Flink 任务,出现反压(就是处理瓶颈)时,从flink ui上查看单个任务的输出量:numRecordsOutPerSecond。然后通过 总QPS / 单并行度的处理能力 = 并行度,最终设置为并行度*1.2 倍,富余一些资源。
以下是一些常用准则:
source端
- 数据源端是kafka,source的并行度设置为kafka对应topic的分区数。
- 如果已经等于kafka的分区数,消费速度仍更不上数据生产速度,考虑下kafka要扩大分区,同时调大并行度等于分区数。
- flink的一个并行度可以处理一至多个分区的数据,如果并行度多于kafka的分区数,那么就会造成有的并行度空闲,浪费资源
Process端
- keyby之前的算子,比如map、fliter、flatmap等处理较快的算子,并行度和source保持一致即可。
- keyby之后的算子,视具体情况而定,可以通过测试反压的方法,得到keyby算子上游的数据发送量和该算子的处理能力来得到合理的并行度(在无倾斜情况下)
sink端
- sink端是数据流向下游的地方,可以根据sink端的数据量及下游的服务抗压能力进行评估。
- 如果sink端是kafka,可以设为kafka对应topic的分区数。
- sink端的数据量若比较小,比如一些高度聚合或者过滤比较大的数据(比如监控告警),可以将并行度设置的小一些。
- 如果source端的数据量最小,拿到source端流过来的数据后做了细粒度的拆分,数据量不断的增加,到sink端的数据量非常大的这种情况,就需要提高并行度。
7. Flink任务报错超出内存
任务在执行一段时间后报错
java.util.concurrent.ExecutionException:java.lang.OutOfMemoryError:GC overhead limit exceeded
at java.util.concurrent.FutrueTask.report(FutureTask.java:122)
这个问题其实要先了解flink的内存模型,参考文章 Flink TaskManager内存管理机制
其实就是
Task Heap
设置的比较小,然后用户自己写的flink任务逻辑比较复杂或是数据量比较大,存储的数据较多超出了内存。
按照上述说明,再根据当前的flink的配置,发现托管内存默认是占用了40%的内存。但在我的任务里这块内存基本上没有使用的,可以调低。通过设置比例值
taskmanager.memory.managed.fraction=0.1
,然后flink会自动调整
Task Heap
的大小。
除此之外,还可以给
taskManager
增加
JVM
启动参数,在
flink-conf
文件下增加:
env.java.opts.taskmanager:-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/flink_taskmanager_oom_%p_%t.hprof
当任务进程发生oom时,会自动生成堆转储(heap dump)文件,后续可以通过
jdk
自带的
jvisualvm
工具解析查看堆中各类数据占比,辅助分析问题。
版权归原作者 Space丶Miao 所有, 如有侵权,请联系我们删除。