1 场景介绍
当我们使用python构建AI模型算法的过程中,经常会遇到如下的问题:
- 这个模型如何提供给其他微服务调用(比如JAVA构建的微服务)?
- 这个模型如何做到多个服务节点的负载均衡?
- 这个模型如何做到服务的备份与故障转移?
本项目通过一个实际的例子,展示如何基于FastAPI实现AI模型服务与nacos的完美融合,从而实现AI模型服务与其他微服务的互通、负载均衡、以及故障转移。本项目实现以下四个目标:
- 基于sklearn构建了一个kmeans算法模型
- 基于FastApi将模型通过发布成api接口
- 基于nacos的sdk,将api接口发布到nacos注册中心,供java等其它微服务调用,实现服务发现、负载均衡、故障转移
- 基于nacos的sdk,将nacos作为配置中心,实时同步nacos中的配置变化,作为构建模型时的参数
服务接口的输入参数:{"points":[[x1,y1],[x2,y2],[x3,y3],......,[xn,yn]]}
服务接口的输出:{"results": [center1,center1,center2,......,centerx]}
输出每个point值对应的center编号
1.1 FastAPI介绍
FastAPI是一个现代的,快速(高性能)python web框架。基于标准的python类型提示,使用python3.6+构建API的Web框架。
FastAPI的主要特点如下:
- 快速:非常高的性能,与NodeJS和Go相当(这个要感谢Starlette和Pydantic),是最快的Python框架之一。
- 快速编码:将开发速度提高约200%到300%。
- 更少的bug:减少大约40%的开发人员人为引起的错误。
- 直观:强大的编辑器支持,调试时间更短。
- 简单:易于使用和学习。减少阅读文档的时间。
- 代码简洁:尽量减少代码重复。每个参数可以声明多个功能,减少程序的bug。
- 健壮:生产代码会自动生成交互式文档。
- 基于标准:基于并完全兼容API的开放标准:OpenAPI和JSON模式。
1.2 Nacos介绍
Nacos 是一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。
Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。 Nacos 是构建以“服务”为中心的现代应用架构 (例如微服务范式、云原生范式) 的服务基础设施
Nacos 支持如下核心特性:
- 服务发现: 支持 DNS 与 RPC 服务发现,也提供原生 SDK 、OpenAPI 等多种服务注册方式和 DNS、HTTP 与 API 等多种服务发现方式。
- 2)服务健康监测: Nacos 提供对服务的实时的健康检查,阻止向不健康的主机或服务实例发送请求。
- 3)动态配置服务: Nacos 提供配置统一管理功能,能够帮助我们将配置以中心化、外部化和动态化的方式管理所有环境的应用配置和服务配置。
- 4)动态 DNS 服务: Nacos 支持动态 DNS 服务权重路由,能够让我们很容易地实现中间层负载均衡、更灵活的路由策略、流量控制以及数据中心内网的简单 DNS 解析服务。
- 5)服务及其元数据管理: Nacos 支持从微服务平台建设的视角管理数据中心的所有服务及元数据,包括管理服务的描述、生命周期、服务的静态依赖分析、服务的健康状态、服务的流量管理、路由及安全策略、服务的 SLA 以及最首要的 metrics 统计数据。
2 代码实现
2.1 构建一个kmeans的机器学习算法
from sklearn.cluster import KMeans
class SklKMeans:
def __init__(self, clusters):
self.model = KMeans(n_clusters=clusters, random_state=9)
def fit(self, inputs):
y_pred = self.model.fit_predict(inputs)
return y_pred
基于sklearn实现,clusters作为入参,聚类的中心点个数。
实际模型初始化是,这个参数是从nacos配置中心同步过来的。
2.2 基于FastApi将kmeans的机器学习算法发布成api接口
资源地址为:'/api/skl-kmeans'
访问参数为:{points:[ [1,2],[2,3],[3,5],[4,6],[8,8],[10,11],[12,12],[12,15],[10,12]]}
返回结果为:{ "results": [1, 1, 1, 1, 2, 0, 0, 0, 0]}
默认聚类中心点的个数:3
from fastapi import APIRouter
from pydantic import BaseModel
from skl_kmeans import SklKMeans
import settings
import numpy as np
router = APIRouter()
class ClusterRequest(BaseModel):
points: list
@router.post('/api/skl-kmeans')
async def api_skl_kmeans(req: ClusterRequest):
try:
model = SklKMeans(settings.clusters)
response = model.fit(np.asarray(req.points))
response = {"results": response.tolist()}
except Exception as ex:
response = None
return response
2.3 将FastApi接口注册到nacos注册中心
服务注册到nacos,服务名称为“skl-kmeans”,服务端口为9999,心跳间隔为30s
import nacos
from net_utils import net_helper
class NacosHelper:
service_name = None
service_port = None
service_group = None
def __init__(self, server_endpoint, namespace_id, username=None, password=None):
self.client = nacos.NacosClient(server_endpoint,
namespace=namespace_id,
username=username,
password=password)
self.endpoint = server_endpoint
self.service_ip = net_helper.get_host_ip()
def register(self):
self.client.add_naming_instance(self.service_name,
self.service_ip,
self.service_port,
group_name=self.service_group)
def unregister(self):
self.client.remove_naming_instance(self.service_name,
self.service_ip,
self.service_port)
def set_service(self, service_name, service_port, service_group):
self.service_name = service_name
self.service_port = service_port
self.service_group = service_group
async def beat_callback(self):
self.client.send_heartbeat(self.service_name,
self.service_ip,
self.service_port)
def load_conf(self, data_id, group):
return self.client.get_config(data_id=data_id, group=group, no_snapshot=True)
def add_conf_watcher(self, data_id, group, callback):
self.client.add_config_watcher(data_id=data_id, group=group, cb=callback)
nacos必备的参数有endpoint、namespace_id,group_name, username, password, data_id
nacos_endpoint = '192.168.1.238:8848'
nacos_namespace_id = '1bc91fa5-37e3-4269-8e41-3f4e4efb7633'
nacos_group_name = 'DEFAULT_GROUP'
nacos_username = 'nacos'
nacos_password = 'nacos'
nacos_data_id = 'skl-kmeans'
service_name = 'skl-kmeans'
service_port = 9999
beat_interval = 30
application.include_router(api_skl_kmeans.router, tags=['skl-kmeans'])
nacos = NacosHelper(nacos_endpoint, nacos_namespace_id, nacos_username, nacos_password)
nacos.set_service(service_name, service_port, nacos_group_name)
nacos.register()
@application.on_event('startup')
def init_scheduler():
scheduler = AsyncIOScheduler()
scheduler.add_job(nacos.beat_callback, 'interval', seconds=beat_interval)
scheduler.start()
2.4 将nacos作为配置中心,启动时读取配置,并实时监控配置变化
配置中心中的配置文件data_id为“skl-kmeans”,项目启动时,强制同步一次配置;更改配置文件时,程序能够自动监控配置文件变化,并自动更新到内存中。
service_name = 'skl-kmeans'
service_port = 9999
beat_interval = 30
nacos_endpoint = '192.168.1.238:8848'
nacos_namespace_id = '1bc91fa5-37e3-4269-8e41-3f4e4efb7633'
nacos_group_name = 'DEFAULT_GROUP'
nacos_username = 'nacos'
nacos_password = 'nacos'
nacos_data_id = 'skl-kmeans'
def load_config(content):
yaml_config = yaml.full_load(content)
clusters = yaml_config['clusters']
settings.clusters = clusters
def nacos_config_callback(args):
content = args['raw_content']
load_config(content)
# 注册配置变更监控回调
nacos.add_conf_watcher(nacos_data_id, nacos_group_name, nacos_config_callback)
# 启动时,强制同步一次配置
data_stream = nacos.load_conf(nacos_data_id,nacos_group_name)
load_config(data_stream)
2.5 在nacos中创建配置文件
配置文件的data_id为skl-kmeans
内容为:
2.6 启动服务
查看skl-kmeans算法服务在nacos的注册情况
已经注册成功,下面通过postman调用(也可以通过java微服务调用),构造请求参数points,实现points的聚类
{"points":[[1,2],[2,3],[3,5],[4,6],[8,8],[10,11],[12,12],[12,15],[10,12]]}
{
"results": [
1,
1,
1,
1,
2,
0,
0,
0,
0
]
}
动态调整nacos中的clusters参数,将3个中心点改为4个中心点,调整完成,kmeans算法已经自动同步了配置变化。通过postman再次调用接口,返回结果已经聚类到4个点
从而说明kmeans算法能够实时监控nacos中配置的变化,实现动态聚类
3 完整代码下载
代码地址:完整code
版权归原作者 智慧医疗探索者 所有, 如有侵权,请联系我们删除。