0


时间如水,东流而逝 !Python中的异步,定时任务(Celery,Apscheduler)

Celery PK APScheduler

Celery:我们通常将celery作为一个任务队列来使用,但是celery也有定时任务的功能。当然了,任务就是消息,消息中间件(也就是broker)可以使用redis或者rabbitmq 。

安装Celery模块:

  1. pip install celery

Celery的默认broker(消息中间件)是RabbitMQ, 当然了,也可以使用Redis 。

APScheduler:算是在实际项目中最好用的一个工具库,不仅可以在程序动态的添加和删除定时任务,还支持持久化 。APScheduler(Advanced Python Scheduler)是一个轻量级的Python定时任务调度框架(Python库)。

安装APScheduler库:

  1. pip install apscheduler

APScheduler使用:

APScheduler有四个组件:

triggers: 触发器,用于设定触发任务的条件,触发器包含了调度的逻辑,每个任务都有自己的触发器决定该任务下次运行的时间。
job stores: 任务储存器,用于存放任务,把任务放在内存或者数据库中,一个
executors: 执行器,用于执行任务,可以设定执行模式为单线程或者线程池,任务完毕后,执行器会通知调度器
schedulers: 调度器,上面的三个组件都是参数,使用这三个参数创建调度器实例来运行

Celery简单使用代码展示:

先定义Celery应用:

test_celery.py:

  1. from celery import Celery
  2. from datetime import timedelta # 时间延后
  3. app = Celery("tornado")
  4. # 导入模块
  5. app.conf["imports"] = ["celery_task"]
  6. # 定义broker 代理人
  7. app.conf.broker_url = "redis://:123@localhost:6379"
  8. # 任务结果
  9. app.conf.result_backend = "redis://:123@localhost:6379"
  10. # 时区
  11. app.conf.timezone = "Asia/Shanghai"
  12. # 配置定时任务
  13. app.conf.beat_schedule = {
  14. "task1":{
  15. "task":"celery_task.job", # 指定任务所在模块
  16. "schedule":timedelta(seconds=3),
  17. "args":()
  18. }
  19. }

定义celery任务task:

celery_task.py:

  1. # 导入celery应用
  2. from test_celery import app
  3. from celery import shared_task # 装饰成定时任务
  4. @shared_task
  5. def job():
  6. # 任务逻辑
  7. return "test"

命令窗口开启服务(两个):

  1. # 启动服务(命令窗)
  2. celery -A test_celery worker --pool=solo -l info
  1. celery -A test_celery beat -l info

会出现以下情况:

任务每隔三秒执行一次,这就是使用celery完成的简单定时任务,当然了,在实际工作当中会遇到较为复杂的任务,需要在任务中添加条件,这就要根据具体情况,具体分析!

APScheduler 使用起来还算是比较简单。运行一个调度任务只需要以下三部曲。

  1. 新建一个 schedulers (调度器) 。
  2. 添加一个调度任务(job stores)。
  3. 运行调度任务。

APScheduler的代码展示:

导包:

  1. import os
  2. from datetime import datetime
  3. from tornado.ioloop import IOLoop, PeriodicCallback
  4. from tornado.web import Application, RequestHandler, url
  5. from apscheduler.schedulers.tornado import TornadoScheduler

写数据,初始化任务调度器:

  1. scheduler = None
  2. job_ids = []
  3. # 初始化
  4. def init_scheduler():
  5. global scheduler
  6. scheduler = TornadoScheduler()
  7. scheduler.start()
  8. print("定时任务启动")

声明要执行的任务:

  1. def task(job_id,filename,size,count):
  2. # 写入逻辑判断
  3. count = int(count)
  4. filelist = os.listdir("./static/uploads/")
  5. temp = []
  6. for i in range(count):
  7. for x in filelist:
  8. if x == "{}_{}".format(filename,i):
  9. temp.append(x)
  10. if len(temp) != count:
  11. print("分片未传输成功")
  12. return False
  13. try:
  14. filesize = os.path.getsize("./static/uploads/{}".format(filename))
  15. except Exception as e:
  16. filesize = 0
  17. if int(size) != filesize:
  18. print("分片合并未成功")
  19. return False
  20. # 删除分片
  21. for x in range(count):
  22. os.remove("./static/uploads/{}_{}".format(filename,x))
  23. # 删除定时任务
  24. scheduler.remove_job(job_id)
  25. job_ids.remove(job_id)
  26. print("分片执行完毕")

声明服务控制器:

  1. class SchedulerHandler(RequestHandler):
  2. async def get(self):
  3. job_id = self.get_argument("job_id",None)
  4. filename = self.get_argument("filename",None)
  5. size = self.get_argument("size",None)
  6. count = self.get_argument("count", None)
  7. global job_ids
  8. if job_id not in job_ids:
  9. job_ids.append(job_id)
  10. scheduler.add_job(task,"interval",seconds=3,id=job_id,args=(job_id,filename,size,count))
  11. print("定时任务入队")
  12. res = {"errcode":0,"msg":"ok任务完成"}
  13. else:
  14. print("该任务已经存在")
  15. res = {"errcode":1,"msg":'fail失败'}
  16. self.finish(res)

任务执行:

  1. if __name__ == '__main__':
  2. routes = [url(r"/scheduler/",SchedulerHandler)]
  3. init_scheduler()
  4. # 声明tornado对象
  5. application = Application(routes,debug=True)
  6. application.listen(8888) # 要注意端口号是否被占用问题
  7. IOLoop.current().start()

在现在项目中,使用APScheduler也是比较简单的,掌握好技巧,玩转整个任务区!


本文转载自: https://blog.csdn.net/justzcy/article/details/126029088
版权归原作者 太阳打伞 所有, 如有侵权,请联系我们删除。

“时间如水,东流而逝 !Python中的异步,定时任务(Celery,Apscheduler)”的评论:

还没有评论