使用 FastAPI、Celery 和 RabbitMQ 的异步架构
使用 RabbitMQ 和 Celery 的 FastAPI
在我之前的一篇教程中,我们已经了解了如何使用Async IO优化FastAPI应用程序的性能。 要了解更多信息,可以参考使用 FastAPI 进行并发。
当我们想要执行小型 BackgroundTask 或使用并发和并行执行任务时,异步 IO 是合适的。但是,当执行繁重的后台计算或复杂任务时,我们理想情况下不希望将它们作为同一进程的一部分来运行。因此,为了将这些复杂的任务作为单独的过程执行,我们需要专门的工具,例如芹菜。
Celery是一个分布式任务队列,可简化任务分配和处理的管理。任务队列用作跨线程或机器分配工作的机制。任务队列的输入是一个称为任务的工作单元。专用工作进程不断监视任务队列以执行新工作。
作为任务排队系统,Celery 非常适合长时间运行的进程或批量执行的小型可重复任务。Celery 处理的问题类型是常见的异步任务。
Celery通过消息进行通信,通常使用代理在客户端和工作器之间****进行调解。要启动任务,客户端会将消息添加到队列,然后代理会将该消息传递给工作器。RabbitMQ和Redis是 Celery 完全支持的代理传输。在我们的示例中,我们将使用 RabbitMQ 作为代理传输。
RabbitMQ是部署最广泛的开源消息代理。RabbitMQ 轻量级且易于在本地和云中部署。它支持多种消息传递协议。RabbitMQ 可以部署在分布式和联合配置中,以满足高规模、高可用性要求。RabbitMQ 是 Celery 的默认代理,因此除了我们要使用的代理实例的 URL 位置之外,它不需要任何其他依赖项或初始配置。
有多种工具可用于监控和检查 Celery 集群。Flower是一款用于Celery的实时 Web 应用程序监控和管理工具。
下图显示了各个组件如何交互的简化图。在这里,我们使用 FastAPI 作为 Celery 客户端,使用 RabbitMQ 作为消息代理和结果后端。
使用 Celery Flow 的 FastAPI
- 客户端向我们的 FastAPI 应用程序发送请求。
- FastAPI 应用程序将任务消息发送到消息代理。
- Celery 工作器从消息代理中消费消息。任务完成后,它将结果保存到结果后端并更新任务状态。
- 将任务发送到消息代理后,FastAPI 应用还可以从结果后端监控任务的状态。
- Flower 还可以通过处理消息代理上的消息来监控芹菜应用程序任务。
在本教程中,我们将了解如何将Celery集成到FastAPI应用程序中以执行异步任务而不阻止用户的请求。虽然我们在这里使用的示例非常简单,但它仅用于纯粹的演示目的。
带有 FastAPI 的 Celery 应用程序可能会有用的一些用例:
- 在应用程序中将电子邮件作为后台任务发送电子邮件。
- 在后台处理上传的图像。
- ML 模型的离线训练。
- 定期任务,例如报告生成或网页抓取。
以下步骤说明如何使用 Celery 和 FastAPI 执行异步任务:
- 设置和安装
- 设置消息代理
- 添加芹菜
- 添加 Celery 任务
- 添加 APIRouter
- 启动应用程序和 Celery Worker 服务器
- 测试应用程序
- 监控任务
先决条件
我们需要安装有Pipenv和Git 的Python 3.9.2。Pipenv是一个软件包和虚拟环境管理器,可
PIP
在后台使用。它提供更多高级功能,例如版本锁定和项目之间的依赖隔离。
1. 设置和安装
一旦满足先决条件,我们就可以开始创建我们的应用程序。
要开始我们的应用程序,请
fastapi-celery-rabbitmq-application
在磁盘上的任意目录中为我们的项目创建一个名为 的文件夹。
创建项目文件夹
导航到项目文件夹。
激活虚拟环境
进入项目文件夹后,执行以下命令来激活 VirtualEnv。
启动 pipenv shell
虚拟环境现在将被激活,它将提供所需的项目隔离和版本锁定。
安装依赖项
接下来,我们将使用 requirements.txt 中的 Pipenv 安装所有必需的依赖项。
我们的应用程序中使用的所有依赖项
安装所有依赖项
当我们执行上述命令之后,所需的依赖项将被安装。
我们现在可以看到在我们的项目文件夹中创建的两个文件,分别是
Pipfile
和
Pipfile.lock
。
Pipfile
包含我们刚刚安装的所有依赖项的名称。Pipfile.lock
旨在根据中存在的依赖关系指定Pipfile
应使用哪个特定版本,避免自动升级相互依赖的依赖关系并破坏项目依赖关系树的风险。
注意:这里,我们安装了所有具有特定版本的依赖项,这些依赖项在编写本教程时在我的计算机上可以正常工作。如果我们不指定任何版本,则将安装该依赖项的最新版本,这可能与其他依赖项不兼容。
下一步是设置消息代理。
2. 设置消息代理
Celery 需要一个解决方案来发送和接收消息;通常,这以称为消息代理的单独服务的形式出现。有几种选择,RabbitMQ就是其中之一。
在本地机器上运行RabbitMQ 的最简单方法之一是使用Homebrew。Homebrew是MacOS上流行的软件包管理器。RabbitMQ 配方可从Homebrew的核心水龙头(开箱即用)获得。
- 要使用 Homebrew 安装 RabbitMQ 服务器,只需执行以下命令:
Brew 安装 RabbitMQ
- 然后可以使用rabbitmq-server在前台启动服务器,或者使用
brew services start rabbitmq
launch 在后台运行该服务器。 - 一旦我们启动服务器,我们就可以使用默认凭据(
Username: guest, Password: guest
)登录到服务器http://localhost:15672
。
RabbitMQ 服务器
有关 RabbitMQ 的其他安装选项,请查看以下链接:
文档:目录
本页总结了最新补丁版本的 RabbitMQ 文档。请参阅下载和…
我们的消息代理已启动并运行。现在让我们配置 Celery。
3. 加入芹菜
我们已经安装了 Celery 运行所需的所有依赖项。现在让我们配置它。在项目的根目录中,让我们创建一个名为 的文件
main.py
。
<iframe src="https://medium.com/media/7758a2744d039968bb5445c753ec977e"allowfullscreen=""frameborder="0"height="564"width="680"title="异步主程序"class="el n fd dy bg"scrolling="no"style="box-sizing: inherit; top: 0px; width: 680px; height: 563.993px; left: 0px;"></iframe>
主程序
- 在上面的代码中,
line 8
我们16
创建了一个名为 的工厂函数create_app()
,它可以随时调用,并返回一个 FastAPI 应用实例供我们使用。在这个函数中,我们调用一个名为 的函数create_celery()
来创建一个 Celery 应用实例。此外,我们还集成了大学路由模块,我们将在本教程中进一步解释它。 23
至行代码是使用Uvicorn24
运行 FastAPI 应用程序所必需的。FastAPI 是我们用来构建 API 的框架,Uvicorn是我们用来处理请求的服务器。
Celery 配置
现在通过添加文件来开始 celery 配置
config/celery_config.py
。
<iframe src="https://medium.com/media/7ed82baa3418caeede251bca4b3f3117"allowfullscreen=""frameborder="0"height="979"width="680"title="celery 配置"class="el n fd dy bg"scrolling="no"style="box-sizing: inherit; top: 0px; width: 680px; height: 978.993px; left: 0px;"></iframe>
Celery 配置
- 上述文件包含 Celery 运行所需的所有配置。从
line 14
到15
我们定义了 Broker URL 和结果后端。在这里,我们使用在中安装的 RabbitMQstep 2
作为代理和结果后端。 - 从
line 17
到23
我们定义了所有队列,我们将在应用程序中使用这些队列以及默认celery
队列。这里我们将使用动态配置路由规则,而不是在每个任务级别手动配置CELERY_TASK_ROUTES
路由规则。 - 我们添加了一个辅助方法来
line 6
在10
运行时获取每个任务的路由。 - 从
line 32
到39
我们定义了一个函数来在启动时加载芹菜配置。
Celery 应用实例
现在让我们再添加一个文件
config/celery_utils.py
,其中将包含下面的代码来创建芹菜应用程序实例。
<iframe src="https://medium.com/media/a309ace9a24dc1efd3a57f557569c83a"allowfullscreen=""frameborder="0"height="740"width="680"title="celery 实用程序"class="el n fd dy bg"scrolling="no"style="box-sizing: inherit; top: 0px; width: 680px; height: 740px; left: 0px;"></iframe>
Celery 实用程序
上述文件定义了一个工厂函数,该函数配置然后返回一个 Celery 应用实例。它将读取之前定义的所有与 celery 相关的配置。我们还更新了一些默认的 celery 配置。在创建 FastAPI 应用实例时
create_celery
调用此函数。
main.py
从
line 22
到
32
我们定义了另一个辅助函数
get_task_info
来获取通过 Celery 提交的异步任务的状态和结果。
4.添加 Celery 任务
现在,我们已经完成了所有与 Celery 相关的配置。现在让我们添加一些我们想要异步执行的任务。我们必须添加一个文件
celery_tasks/tasks.py
,其中包含我们想要异步执行的所有任务。
<iframe src="https://medium.com/media/88bf7bdc1e862ce7812fc2ed9f667238"allowfullscreen=""frameborder="0"height="517"width="680"title="celery 任务"class="el n fd dy bg"scrolling="no"style="box-sizing: inherit; top: 0px; width: 680px; height: 516.997px; left: 0px;"></iframe>
任务.py
在上面的文件中,我们定义了两个任务
get_all_universities_task
和
get_university_task
。
get_all_universities_task
用于获取作为输入提供的国家列表中的所有大学。
get_university_task
用于获取给定国家/地区的大学。
在这里,我们习惯于
shared_task
使代码可重用,这需要在方法
current_app
内使用实例
create_celery
,而不是创建新的 Celery 实例。@shared_task 装饰器返回一个始终使用 current_app 中的任务实例的代理。它允许我们创建可由任何应用程序使用的任务。由于 Celery 内置了对重试失败任务的支持,因此我们已为我们的任务启用了它。使用重试选项,我们现在可以让异常冒泡并让装饰器处理它。
5. 添加 APIRouter
现在,让我们添加一些入口点来测试我们的应用程序。我们将从一个简单的同步入口点开始。此入口点将调用一些外部 API 并将结果返回给客户端。在这里,我们使用我在之前的一个教程中使用过的相同免费 API。它在Apipheny中列出。我们将使用大学列表 API来获取指定国家/地区的大学列表。在本教程中,我们将获取作为 API 输入提供的大学列表。我们不会验证响应,因为我们只是模拟在我们的应用程序中执行一些长时间运行的任务来完成请求的场景,比如我们需要调用多个 API,在此基础上进行某些处理,然后返回结果。
让我们添加一个专用文件作为模块来处理大学请求
/routers/universities.py
。
<iframe src="https://medium.com/media/4d7b5cdf1a304118d3812d137a66939c"allowfullscreen=""frameborder="0"height="1374"width="680"title="大学路由器"class="el n fd dy bg"scrolling="no"style="box-sizing: inherit; top: 0px; width: 680px; height: 1373.99px; left: 0px;"></iframe>
大学_路由器.py
我们希望将与大学相关的路径操作与其余代码分开,以保持代码井然有序。但它仍然是同一个FastAPI应用程序的一部分。我们可以使用 为任何模块创建路径操作
APIRouter
。我们已经在 中的应用程序中包含了此路由器
step 3
。这里我们添加了 4 个不同的端点来测试我们的应用程序。
- get_universites :从
line 14
到22
我们定义了一个简单的 API,它将国家列表作为输入,然后调用一些外部 API 来获取结果。每个国家的结果被组合起来然后返回给调用者。添加此 API 只是为了检查 API 集成。它与 Celery 无关。 - get_universities_async:此 API 用于演示如何使用 Celery 异步执行长时间运行的任务。当我们调用此 API 时,它会向代理提交一条任务消息并返回该消息的 taskID。然后,消费者可以使用其他 API 来检查已提交任务的状态或获取任务结果。正如我们在这里看到的,我们只需要调用
get_all_universities_task
前面定义的方法step 4
。只需在函数名后添加即可调用 Celery 任务.apply_async()
。这将告诉 Celery 将新任务添加到队列。它在内部将任务提交到universities
队列。将有一个工作进程监听这个队列。工作进程将接收消息、处理请求并将结果保存在结果后端。 - get_universities_parallel:此 API 用于演示如何使用 Celery 将大任务拆分为较小的子任务并并行执行。调用此 API 时,我们会为输入的每个国家/地区创建一个任务。然后我们使用 Celery Group 对这些任务进行分组。Celery组是 惰性的,因此我们必须调用它来采取行动并评估该组。然后我们执行该组并等待它完成。一旦所有任务完成,该组将完成,我们将得到响应。之后,我们收集组的响应并将其返回给调用者。这是一个同步流程,它使用 Celery 并行执行子任务。
- get_task_status:此API用于获取使用我们之前描述的API之一提交的异步任务的状态和结果。
让我们看一下获取大学的代码:
<iframe src="https://medium.com/media/43597d4e1c113b5c234dadfb322d78c3"allowfullscreen=""frameborder="0"height="300"width="680"title="获取该国家/地区的所有大学"class="el n fd dy bg"scrolling="no"style="box-sizing: inherit; top: 0px; width: 680px; height: 299.983px; left: 0px;"></iframe>
我们
line 5
使用名为httpx 的 HTTP 客户端发送 HTTP 请求。HTTPX 是 Python 3 的功能齐全的 HTTP 客户端,它提供同步和异步 API,并支持 HTTP/1.1 和 HTTP/2。
在这里,我们使用同步 API 来获取大学数据。在上面的代码中,
line 6
我们调用大学 API 来获取结果。
line 7
我们将响应转换为
JSON
格式。然后从
line 9
到
11
,我们将对象映射
JSON
到Pydantic模型
University
。最后,我们返回字典,其中包含国家名称作为键和大学列表作为值。
现在我们的代码已经完成,可以使用 Celery 运行 FastAPI。
6.启动应用程序和 Celery Worker 服务器
step 3
我们已经添加了启动应用程序所需的代码。
FastAPI 是我们用来构建 API 的框架,Uvicorn是我们用来处理请求的服务器。我们已经安装了Uvicorn。那将是我们的服务器。我们可以通过执行以下命令来启动应用程序:
python 主要.py
应用程序成功启动后,我们可以导航到
http://localhost:9000/docs
。系统将打开一个类似这样的页面。我们可以在之前定义的 Swagger-UI 中看到端点。
Swagger-ui
现在让我们启动 Celery 工作服务器。我们需要打开一个新的终端并进入项目根目录。然后我们需要使用以下命令安装所有依赖项:
Pipenv 安装-r 要求.txt
一旦安装了所有依赖项,我们就可以执行以下命令:
celery -A main.celery worker --loglevel=info -Q 大学,大学
universities, university
Celery 工作者将会监听我们应用程序中使用的自定义队列: 。
芹菜工
现在一切都已启动并运行!让我们测试我们的应用程序。
7.测试应用程序
让我们用一些示例国家测试我们的大学端点,以验证外部 API 集成。在这里,我们获取“土耳其”和“印度”的所有大学。我们不验证响应或数据。
获取大学请求
获取大学回应
如上所示,我们获得了请求国家/地区的大学列表。在这里,我们使用大学列表 API获取指定国家/地区的大学列表。
一旦我们测试了外部 API 交互是否正常工作,让我们使用 Celery 测试异步任务执行。在这里,我们将尝试获取作为输入提供的同一组国家/地区的大学列表。
异步请求
异步响应
一旦提交请求,我们就会获得 task_id 作为确认响应。请求使用 Celery 作为单独的进程异步处理。我们可以使用其他 API 检查任务的状态。通过这种架构,我们可以处理大量请求。我们可以将用户请求排入队列,并根据应用程序的资源可用性对其进行处理。
让我们检查一下上面提交的异步任务的状态。我们可以使用下面的 API,以 task_id 作为输入。
获取任务状态请求
获取任务状态响应
现在让我们测试一下将用户请求拆分为多个子任务、使用 Celery 并行处理它们然后返回结果的场景。
并行请求
平行反应
这与之前测试的第一个 API 具有相同的请求和响应。但是,由于我们使用 Celery 并行获取每个国家的大学,因此执行速度会更快。在这里,我们在 0.58 秒内获得了响应,而第一个 API 则需要 1.67 秒。
8. 监控任务
到目前为止,我们可以看到使用 Celery 提交了一些要在后台执行的任务。如果我们可以使用 GUI 查看这些任务的进度,那就太好了。监控 Celery 应用程序的最简单方法是通过 Flower。使用它,我们可以轻松检查所有任务的进度。我们已经安装了所有依赖项。我们只需要启动
flower
。要在本地计算机上运行
flower
服务器,请从项目根目录执行以下命令:
芹菜 -A main.celery 花 --port=5555
花
我们现在可以看到我们的花卉监控工具正在我们的本地服务器上运行
http://localhost:5555/
。
花仪表板
在仪表板中,我们可以看到通过 Celery 执行的所有任务的摘要。
一旦任务提交之后,我们还可以通过点击
tasks
Flower导航栏上的Tab来查看每个任务的进度。
例如,我们可以看到每个任务的状态,从“进行中”到“成功”或“失败”。我们还可以看到任务的启动时间、运行时间和响应等等。
Celery 任务
结论
花卉监控工具正在我们的本地服务器上运行
http://localhost:5555/
。
[外链图片转存中…(img-cd6LYuX6-1722505743935)]
花仪表板
在仪表板中,我们可以看到通过 Celery 执行的所有任务的摘要。
一旦任务提交之后,我们还可以通过点击
tasks
Flower导航栏上的Tab来查看每个任务的进度。
例如,我们可以看到每个任务的状态,从“进行中”到“成功”或“失败”。我们还可以看到任务的启动时间、运行时间和响应等等。
Celery 任务
结论
在本教程中,我们了解了如何将 Celery 与 FastAPI 应用程序集成。将 Celery 与 FastAPI 应用程序一起使用不仅可以提高整体性能和效率,还有助于更好地处理长时间运行的任务的后台作业。我们还了解了如果我们想监控通过 Celery 提交的任务,那么如何使用 Flower 来做同样的事情。
博客原文:专业人工智能技术社区
版权归原作者 CloseAi论坛 所有, 如有侵权,请联系我们删除。