0


python 多进程队列批量处理数据

问题

做生信分析时,会遇到很多样品需要用同一个脚本去分析的情况。这些脚本除了样本名字需要更改,其他的都一样。有时候就会遇到这样的尴尬情况:
(1)如果这些样品一次性提交,然后在后台一个一个的依次分析比较费时间。
(2)如果同时把这些样品全部分析,服务器又带不动。
(3)每次同时分析3个,结束了再提交3个,这个样子就会一直在忙着查看程序进程及提交新的程序,非常累。

用一个比较容易理解的例子来讲就是:服务器出了一系列bug。如果只招一个程序员,一个一个的去修改,太慢了。如果招上和bug数量相等的程序员,每人解决一个bug,经费又不够。如果招上几个程序员,把bug平均分配,每人负责几个,那么必定有人很快把分到的bug解决完了,但是有人就是很慢。服务器想运行就得等着最后那个程序员把他负责的bug解决完。

解决思路

借鉴集群提交任务的思路,借助python的队列和多进程模块,写了一个非常实用的自动化运行小脚本。
能够实现:
(1)把任务全部存到队列里.
(2)任何时间都只有自己规定数量的任务在并行运行,这样防止任务过多服务器带不动。
(3)某个进程运行的任务结束后,就从队列中获取新的任务继续运行。

简单点理解就是说。服务器出了一系列bug。现在招上几个程序员,他们共同处理这几个bug。一个程序员在处理完拿到的任务后,就到队列拿到新的任务去处理。这个可以很快把所有问题都解决完。

实现

from multiprocessing import Process,Queue
import os

defWorker(q):whileTrue:
        task=q.get()#task就是你的sample名字if task isNone:break
        os.system('sh do_task.sh {}.fa'.format(task))#以linux环境为例,这里输入你具体的程序运行命令行
q=Queue()
fa=open('task.txt')for line in fa:
    sample=line.rstrip()
    q.put(sample)#把要处理的样品加入队列
fa.close()
workers=[]for i inrange(3):#任何时间三个任务并行
    q.put(None)#在队列queue最后加上None,使样品处理完后程序结束运行。
    worker=Process(target=Worker,args=(q,))
    worker.start()
    workers.append(worker)for worker in workers:
    worker.join()#进程阻塞,保证任务全部完成后再处理后续命令print('Finished successfully')

task.txt内容举例:
sample1
sample2
sample3
sample4
sample5
sample6

实际在使用时,只需要保证样品名称成功load到队列中及更改程序运行的命令那一行即可,使用起来非常方便。

标签: python 队列 多进程

本文转载自: https://blog.csdn.net/weixin_43745169/article/details/120607429
版权归原作者 Chipeyown 所有, 如有侵权,请联系我们删除。

“python 多进程队列批量处理数据”的评论:

还没有评论