0


【Byzer】Python Daemon 暴增的问题排查

背景

Byzer 介绍

Byzer 作为一门面向 Data 和 AI 的新一代编程语言。具体信息可以点击下面链接了解:https://docs.byzer.org/#/byzer-lang/zh-cn/introduction/byzer_lang_design

修复如下 Github issues:
https://github.com/byzer-org/byzer-lang/issues/1652

问题描述

后台发现 daemon 进程增长很快!而正常情况是一个

  1. python env

应该只有一个进程。

下图可以看到多个 daemon:
在这里插入图片描述

影响范围

大致从

  1. driver

节点的系统资源分析了一下,发现问题比较严重,注意有以下几点:

  • 进程常驻,不会自动销毁,会导致创建过多导致 CPU 过高
  • 占用内存较高,跟 daemon 中任务使用的资源有关
  • 大量 IDEL 进程,实际运行很短,然后就进入阻塞状态

下面是具体的分析过程。

经过大量验证,发现问题属于偶发现象,daemon 会长时间驻留内存,直到

  1. Byzer

引擎重启。从下图可以看到,启动时间

  1. 334

  1. 826

都有泄露的进程,并且不会自动销毁。

在这里插入图片描述

于是我打算尝试 kill 进程后,对比基线看看对系统资源的影响。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

可以发现,第一次 kill 直接释放了 1.4G 左右的内存,后面的任务没有释放那么多,但都是有不同程度的释放。 因为我执行的 python 任务有的是空跑,有的比较吃资源,应该跟 daemon 任务使用的资源是相关的。

查看当前进程资源和堆栈情况。

使用

  1. pstree

,查看相关进程:

在这里插入图片描述

发现几个特点,在 java 端使用

  1. bash -c

创建 daemon 的进程只有一个,即问题大概率出现在

  1. python

侧。泄露的这些 daemon,都有相同的

  1. ppid

,应该是在 daemon 中

  1. fork

出来的新进程,而且前面

  1. ps

命令看到的状态

  1. S1

也印证了这一点。

在这里插入图片描述

通过使用 python 工具

  1. py-spy

排查堆栈:

  1. sudo py-spy top --pid 76366
  1. top

: 实时查看每个函数运行时间并统计

显示如下:

在这里插入图片描述

观察上图中实时统计的峰顶,发现是卡在了

  1. _poll_connectivity

上面,对应的代码是下面这行:

  1. while True:
  2. event = channel.watch_connectivity_state(connectivity,
  3. time.time()+0.2)

google 了一下发现一个类似的讨论:

https://github.com/grpc/grpc/issues/3064

应该是进程阻塞在了完成队列上面。再往上追溯没有找到具体的调用了,需要后续对

  1. pyjava

代码详细分析…

问题复现

因为该问题是偶发的,我尝试了比较多的方式复现该问题。一开始测试了正常的任务,包括不同的

  1. python env

配置下 daemon 的表现,

  1. dataMode

设置为 data 和 model 时的表现,

  1. runIn

在 driver 和 executor 时的表现,不同的

  1. schema

时的表现,发现如果代码逻辑正常daemon是不会增长的。于是看了下报错时任务的表现,定位到

  1. schema

(即输出表结构)是错误的时候,且运行在 driver 端,会出现该问题。

设置一个错误的类型,并在 driver 执行(executor 抛异常不会导致 daemon 增长):

  1. !python env "PYTHON_ENV=source activate ray1.8.0";
  2. !python conf "schema=st(field(top,map(string,string)))";
  3. !python conf "runIn=driver";
  4. !python conf "dataMode=model";
  5. run command as Ray.`` where
  6. inputTable="command"
  7. and outputTable="output"
  8. and code='''
  9. from pyjava.api.mlsql import RayContext, PythonContext
  10. context.build_result([{"top": {"a": "string"}}])
  11. ''';

执行后会得到一个异常,堆栈信息如下:

  1. Error while decoding: java.lang.UnsupportedOperationException
  2. createexternalrow(staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface scala.collection.Map), toScalaMap, staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(lambdavariable(MapObject, StringType, true, -1), lambdavariable(MapObject, StringType, true, -1).toString, input[0, map<string,string>, true].keyArray, None).array, true, false), staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(lambdavariable(MapObject, StringType, true, -2), lambdavariable(MapObject, StringType, true, -2).toString, input[0, map<string,string>, true].valueArray, None).array, true, false), true, false), StructField(top,MapType(StringType,StringType,true),true))
  3. java.lang.RuntimeException: Error while decoding: java.lang.UnsupportedOperationException
  4. createexternalrow(staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface scala.collection.Map), toScalaMap, staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(lambdavariable(MapObject, StringType, true, -1), lambdavariable(MapObject, StringType, true, -1).toString, input[0, map<string,string>, true].keyArray, None).array, true, false), staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(lambdavariable(MapObject, StringType, true, -2), lambdavariable(MapObject, StringType, true, -2).toString, input[0, map<string,string>, true].valueArray, None).array, true, false), true, false), StructField(top,MapType(StringType,StringType,true),true))
  5. org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:186)
  6. tech.mlsql.ets.Ray.$anonfun$distribute_execute$20(Ray.scala:182)
  7. scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
  8. scala.collection.Iterator$$anon$11.next(Iterator.scala:494)
  9. scala.collection.Iterator.foreach(Iterator.scala:941)
  10. scala.collection.Iterator.foreach$(Iterator.scala:941)
  11. scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
  12. scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
  13. scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
  14. scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:184)
  15. scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:47)
  16. scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
  17. scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
  18. scala.collection.AbstractIterator.to(Iterator.scala:1429)
  19. scala.collection.TraversableOnce.toList(TraversableOnce.scala:299)
  20. scala.collection.TraversableOnce.toList$(TraversableOnce.scala:299)
  21. scala.collection.AbstractIterator.toList(Iterator.scala:1429)
  22. tech.mlsql.ets.Ray.distribute_execute(Ray.scala:180)
  23. tech.mlsql.ets.Ray.train(Ray.scala:56)
  24. tech.mlsql.dsl.adaptor.TrainAdaptor.parse(TrainAdaptor.scala:116)
  25. streaming.dsl.ScriptSQLExecListener.execute$1(ScriptSQLExec.scala:408)
  26. streaming.dsl.ScriptSQLExecListener.exitSql(ScriptSQLExec.scala:447)
  27. streaming.dsl.parser.DSLSQLParser$SqlContext.exitRule(DSLSQLParser.java:296)
  28. org.antlr.v4.runtime.tree.ParseTreeWalker.exitRule(ParseTreeWalker.java:47)
  29. org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:30)
  30. org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28)
  31. streaming.dsl.ScriptSQLExec$._parse(ScriptSQLExec.scala:160)
  32. streaming.dsl.ScriptSQLExec$.parse(ScriptSQLExec.scala:147)
  33. streaming.rest.RestController.$anonfun$script$1(RestController.scala:153)
  34. tech.mlsql.job.JobManager$.run(JobManager.scala:74)
  35. tech.mlsql.job.JobManager$$anon$1.run(JobManager.scala:91)
  36. java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  37. java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  38. java.lang.Thread.run(Thread.java:748)

问题定位

看到多个daemon被创建第一反应是不是什么原因导致java侧创建python daemon的脚本被多次执行。

控制创建daemon的逻辑如下:

  1. private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
  2. def createPythonWorker(pythonExec: String, envVars: Map[String, String], conf: Map[String, String]): java.net.Socket = {
  3. synchronized {
  4. val key = (pythonExec, envVars)
  5. pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars, conf)).create()
  6. }

可以看到,如果一个

  1. python worker factory

已经创建了,就会被缓存到

  1. pythonWorkers

中。因为key只有pythonExec, envVars,没有 hint 中设置的

  1. python conf

,如果用户 pythonExec 中

  1. code

  1. python env

环境没有变化,就会取到创建好的

  1. python worker factory

,而 daemon进程相关信息就是保存在 factory 中的。*

经过断点调试,也确实印证了前面观察的结果,java 侧只创建了一次 daemon,因为相同

  1. conda

环境时,的的确确是从缓存中取到的,即使上一次执行报错。而不同的

  1. conda

环境确实也会导致daemon增长,但属于正常且可控的。具体的代码逻辑梳理成了单独的文档:Ray插件java侧代码走读

回过头来仔细看了一下异常堆栈,发现是在函数

  1. stage1_schema_encoder

执行时报错(也是问题的所在),因为返回的结果不能转换为

  1. map

结构。可以看代码如下:

  1. val javaContext = new JavaContext()
  2. val commonTaskContext = new AppContextImpl(javaContext, batch)
  3. val columnarBatchIter = batch.compute(Iterator(newIter), 0, commonTaskContext)
  4. val data = columnarBatchIter.flatMap { batch =>
  5. batch.rowIterator.asScala.map(f =>
  6. stage1_schema_encoder(f)
  7. )
  8. }.toList
  9. javaContext.markComplete
  10. javaContext.close
  11. val rdd = session.sparkContext.makeRDD[Row](data)
  12. session.createDataFrame(rdd, stage1_schema)
  13. }

而后面对于 javaContext 状态和关闭的操作因为异常没有被执行到,我于是 dig 到如下的错误case。

  • 不更新 context 状态会导致 monitor 不会去杀掉僵尸的进程;
  • 而不手动关闭,会导致 daemon 一直阻塞在完成队列上。

具体的 dig 流程在下面阐述。

javaContext.markComplete

我们知道 Byzer 是会自动 check 和关闭无用 worker 的,但是如果状态没有被设置为完成或者异常,会导致monitor不会去自动杀掉任务,因为 while 条件

  1. !context.isInterrupted && !context.isCompleted

永真,后续是否可以加一下任务超时控制?

  1. override def monitor(callback: () => Unit) = {
  2. (taskKillTimeout: Long, pythonExec: String, envVars: Map[String, String], worker: Socket) => {
  3. // Kill the worker if it is interrupted, checking until task completion.
  4. // TODO: This has a race condition if interruption occurs, as completed may still become true.
  5. while (!context.isInterrupted && !context.isCompleted) {
  6. Thread.sleep(2000)
  7. }
  8. if (!context.isCompleted) {
  9. Thread.sleep(taskKillTimeout)
  10. if (!context.isCompleted) {
  11. try {
  12. // Mimic the task name used in `Executor` to help the user find out the task to blame.
  13. val taskName = s"${context.partitionId}"
  14. logWarning(s"Incomplete task $taskName interrupted: Attempting to kill Python Worker")
  15. PythonWorkerFactory.destroyPythonWorker(pythonExec, envVars, worker)
  16. } catch {
  17. case e: Exception =>
  18. logError("Exception when trying to kill worker", e)
  19. }
  20. }
  21. }
  22. }
  23. }

javaContext.close

关闭方法也比较重要,dig 了一下发现是关闭 arrow vector 和 arrow allocator:

  1. val allocator =
  2. ArrowUtils.rootAllocator.newChildAllocator("toBatchIterator", 0, Long.MaxValue)
  3. val root = VectorSchemaRoot.create(arrowSchema, allocator)
  4. val unloader = new VectorUnloader(root)
  5. val arrowWriter = ArrowWriter.create(root)
  6. context match {
  7. case c: AppContextImpl => c.innerContext.asInstanceOf[JavaContext].addTaskCompletionListener { _ =>
  8. root.close()
  9. allocator.close()
  10. }

我于是做了如下修改(右边为修改后的代码):
在这里插入图片描述
在将两行操作移到了 finally 位置,并设置一个比较容易理解的异常 message。


Join Byzer Community

加入我们的slack channel!

https://join.slack.com/t/byzer-org/shared_invite/zt-10qgl60dg-lX4fFggaHyHB6GtUmer_xw

标签: 后端 开发语言

本文转载自: https://blog.csdn.net/m0_58032574/article/details/122397636
版权归原作者 张哈希 所有, 如有侵权,请联系我们删除。

“【Byzer】Python Daemon 暴增的问题排查”的评论:

还没有评论