目录
简介
celery是基于python的分布式任务队列,一款简单、灵活、可靠的分布式系统。可以在一台机器上运行,也可以在多台机器上运行。
任务队列一般用于线程或计算机之间分配工作的一种机制。
核心
- task, 耗时的任务、定时任务
- broker, 消息中间件,如redis/rabbitmq
- worker, 执行者,并发执行(eventlet,gevent)
- backend, 存储任务的结果,如redis/memcached/Django ORM; 默认禁用,如果不需要结果则禁用;局部禁用 结果
@app.task(ignore_result=True)deftask():return5
- beat, 定时任务的调度器
功能
- 监控集群状态
- 任务调度
- 内存泄漏保护
Celery配置项
app = Celery("myworker")# 消息队列
app.conf.broker_url =""# 结果后台
app.conf.result_backend =""# 时区
app.conf.timezone ="Asia/Shanghai"# 保存结果过期 s
app.conf.result_expires =200
web框架集成
flask,
django,
安装
pip install celery==4.4.7# 同时安装依赖
pip install 'celery[gevent]'
pip install 'celery[redis]'
broker
rabbitmq
生产环境中的首先
# 只需简单配置
broker_url ='amqp://myuser:mypassword@localhost:5672/myvhost'
rabbitmq官网
Ubuntu安装:
$ sudo apt-get install rabbitmq-server
docker运行:
$ docker run -d -p 5462:5462 rabbitmq
https://www.celerycn.io/ru-men/celery-chu-ci-shi-yong
在包中使用Celery
python3中优先绝对路径导入,然后相对路径
- 目录结构
- 代码celery.py
"""
基础设置
"""from celery import Celery
# 实例化
app = Celery("my_worker",# worker名称
broker="redis://:[email protected]:6379/5",# 消息队列
backend="redis://:[email protected]:6379/6",# 结果后端,默认禁用
include=["pkg.tasks"],# celery启动时,需导入的模块)# 更新配置
app.conf.update(
result_expires =3600,# 结果过期时间)if __name__ =="__main__":
app.start()
tasks.py
"""
创建任务函数
"""from.celery import app
# 创建任务函数@app.taskdeftask1():print("简单打印结果。。。")[email protected](ignore_result=True)# 当前任务禁用结果deftask2():print("局部禁用结果")return10
启动worker进程
# windows 使用eventlet、gevent单线程的高并发---协程
celery -A pkg worker -l info -P eventlet -n laufing
# -A 应用# -l log级别# -P 并发形式# -n 主机名 , 一台主机一个名字,起一个worker进程# linux 默认prefork 进程池
celery -A pkg worker -l info
查看worker正在处理的任务:
# 检查集群中所有的worker正在执行的任务
celery -A pkg inspect active
# 监视 指定某些worker 正在执行的任务
celery -A pkg inspect active [email protected],
# 监控worker及执行的任务
celery -A pkg control enable_events
# 查看worker执行状况
celery -A pkg events --dump
# 禁用事件
celery -A pkg control disable_events
# 查看集群中的worker列表
celery -A pkg status
celery定时任务
- 配置 定时任务调度器
# celery.py 内部# 定时任务调度器
beat_shedule ={"task1":{"task":"pkg.tasks.func1",# 任务函数"schedule": crontab(minute="*/1"),# 每1分钟执行一次,将任务提交到任务队列"args":(),# 任务函数的参数},"task2":{},"task3":{},}# 更新配置
app.conf.update(
beat_schedule = beat_schedule
)
- 启动 定时任务调度器
# 启动, 会定时提交任务到异步队列
celery -A pkg beat -l info
# 启动worker进程
celery -A pkg worker -l info -P eventlet -n laufing
定时任务遇到的问题
- 解决方案删除之前启动定时任务的相关文件
以上可以直接在Django中使用Celery的定时任务
后台启动worker进程
linux下:, 注意window平台不支持
# 后台启动 multi start
celery -A pkg multi start my_worker -l info
# 后台 重启 multi restart# 后台 异步停止 multi stop # 后台同步停止 multi stopwait
版权归原作者 laufing 所有, 如有侵权,请联系我们删除。