0


【极客日常】python进程池ProcessPoolExecutor的用法与实现分析

在python开发期间,由于GIL的原因,不能直接采用并行的方式处理代码逻辑。在multiprocessing库的支持下,python程序能够启动子进程执行特定的任务,但子进程的管理也成为了问题。为了简化用户开发成本,python在concurrent.futures下内置了ProcessPoolExecutor这一数据结构,实现了简单的进程管理及任务调度。如果没有特别的需求,开发者只需要用ProcessPoolExecutor即可实现并行执行任务。因此,本文简单对ProcessPoolExecutor的实现进行分析,帮助大家更加了解python开发中进程/任务调度的一种方式。

首先来看ProcessPoolExecutor的用法,可以参考官方文档

  • constructor:构造器 - max_workers:最大worker数量- context:进程启动方式,比如spawn、fork等。可以参考这篇文章- initializer:初始化环境用的回调/钩子,会在传进去的任务执行之前调用(比如要import什么库,读取什么配置之类)- initargs:初始化回调的参数
  • submit:提交特定任务,返回一个future实例(类似于js的promise)
  • map:批量submit
  • shutdown:关闭进程池并销毁资源

可以看到用法非常简单,用户一侧只需要这样操作即可得到任务执行结果:

from concurrent.futures import ProcessPoolExecutor

deftask(sleep_sec=10, tag='test'):print('[%s] start sleep'% tag)
    time.sleep(sleep_sec)print('[%s] finish sleep'% tag)return100defmain():
    process_pool = ProcessPoolExecutor(max_workers=3)
    future = process_pool.submit(task,3, tag='TEST')
    ret = future.result()print('result is %s'%str(ret))
    process_pool.shutdown()if __name__ =='__main__':
    main()

然后就可以打印出以下内容:

[TEST] start sleep
[TEST] finish sleep 
result is 100

其中,finish sleep在start sleep打印的3秒后才打印出来

简单的入口后面肯定存在精巧的逻辑。在ProcessExecutorPool源码中,有很清晰的注释去阐述这一数据结构的实现,有兴趣的读者可以直接翻越源码,debug源码来探索其中的逻辑

ProcessPoolExecutor的基础结构如下:

在这里插入图片描述

其中,Queue Management Thread(队列管理线程)是整个ProcessPoolExecutor的核心,不仅控制任务的收发,而且调度任务在不同进程中的执行,并且处理因为各种原因带来的进程池的异常。

以上面代码为例,ProcessPoolExecutorl整个执行流程,可以如下所示:

  • 用户初始化ProcessPoolExecutor - 检查worker数量合法性,设置进程启动方式context- 初始化WorkID队列、WorkID->WorkItem的缓存map、调用队列Call Queue、结果队列Result Queue- 初始化用来激活队列管理线程的pipe - 队列管理线程会对一系列fd/pipe进行管理。pipe包括激活队列管理线程的以及结果队列的;fd则是进程池每个进程的- 队列管理线程会select这些fd/pipe,根据对应事件作出行为
  • 用户submit任务(函数+参数)到ProcessPoolExecutor实例 - 自增WorkID,将任务所需的函数&参数放到WorkItem,然后把这些信息缓存到到WorkID->WorkItem的map里- 若是未启动进程池,先启动进程池。然后启动队列管理线程- 唤醒队列管理线程工作 - 唤醒的方式:给主线程连到激活队列管理线程的pipe发二进制空字符串,会被select到- 切到队列管理线程,线程内主循环先拿到WorkID&WorkItem,同时侦测到被唤醒的signal,于是开始执行正常任务 - 把WorkID跟WorkItem结合为CallItem放到Call Queue里- 进程池每个进程会从Call Queue提取任务执行,将结果ResultItem放到Result Queue。如果进程异常,会直接把pid放到Result Queue- 队列管理线程有个is_broken变量控制是否整个进程池/队列坏了,如果坏了就不能正常执行任务,整个executor都得被销毁- 队列管理线程select到Result Queue并读取其中内容。提取出运行结果后,和缓存的WorkID&WorkItem对上。这样用户侧用future.result()就可以得到结果
  • 用户销毁ProcessPoolExecutor - 将关闭队列管理线程的flag置为true- 队列管理线程在主循环中发现flag置为true,默认会等待所有WorkItem执行完后执行销毁操作- 发送None给到进程池所有进程,这些进程收到None会返回pid,队列管理线程发现pid会自动除掉相应进程索引记录,并等待进程join- 关闭Call Queue,赋值Result Queue为None- 进程与Call Queue全部关闭后,队列管理线程跳出主循环,主线程等待队列管理线程join- 关闭激活队列管理线程的pipe

本文转载自: https://blog.csdn.net/u013842501/article/details/117717200
版权归原作者 utmhikari 所有, 如有侵权,请联系我们删除。

“【极客日常】python进程池ProcessPoolExecutor的用法与实现分析”的评论:

还没有评论