0


开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(六)

** 一、前言**

使用 FastAPI 可以帮助我们更简单高效地部署 AI 交互业务。FastAPI 提供了快速构建 API 的能力,开发者可以轻松地定义模型需要的输入和输出格式,并编写好相应的业务逻辑。

FastAPI 的异步高性能架构,可以有效支持大量并发的预测请求,为用户提供流畅的交互体验。此外,FastAPI 还提供了容器化部署能力,开发者可以轻松打包 AI 模型为 Docker 镜像,实现跨环境的部署和扩展。

总之,使用 FastAPI 可以大大提高 AI 应用程序的开发效率和用户体验,为 AI 模型的部署和交互提供全方位的支持。

本篇在开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)基础上,学习如何*集成Tool获取实时数据,并以流式方式返回*

二、术语

2.1.Tool

Tool(工具)是为了增强其语言模型的功能和实用性而设计的一系列辅助手段,用于扩展模型的能力。例如代码解释器(Code Interpreter)和知识检索(Knowledge Retrieval)等都属于其工具。

2.2.langchain预置的tools

https://github.com/langchain-ai/langchain/tree/v0.1.16/docs/docs/integrations/tools

基本这些工具能满足大部分需求,具体使用参见:

2.3.LangChain支持流式输出的方法

  • stream:基本的流式传输方式,能逐步给出代理的动作和观察结果。
  • astream:异步的流式传输,用于异步处理需求的情况。
  • astream_events:更细致的流式传输,能流式传输代理的每个具体事件,如工具调用和结束、模型启动和结束等,便于深入了解和监控代理执行的详细过程。

2.4.langchainhub

是 LangChain 相关工具的集合中心,其作用在于方便开发者发现和共享常用的提示(Prompt)、链、代理等。

它受 Hugging Face Hub 启发,促进社区交流与协作,推动 LangChain 生态发展。当前,它在新架构中被置于 LangSmith 里,主要聚焦于 Prompt。

2.5.asyncio

是一个用于编写并发代码的标准库,它提供了构建异步应用程序的基础框架。

三、前置条件

3.1. 创建虚拟环境&安装依赖

增加Google Search以及langchainhub的依赖包

conda create -n fastapi_test python=3.10
conda activate fastapi_test
pip install fastapi websockets uvicorn
pip install --quiet  langchain-core langchain-community langchain-openai
pip install google-search-results langchainhub

3.2. 注册Google Search API账号

参见:开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)

3.3. 生成Google Search API的KEY


四、技术实现

4.1. 使用Tool&流式输出

# -*- coding: utf-8 -*-
import asyncio
import os
from langchain.agents import  create_structured_chat_agent, AgentExecutor
from langchain_community.utilities.serpapi import SerpAPIWrapper
from langchain_core.prompts import SystemMessagePromptTemplate, HumanMessagePromptTemplate, ChatPromptTemplate
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI

os.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'  # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

llm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0,max_tokens=512)

@tool
def search(query:str):
    """只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""
    serp = SerpAPIWrapper()
    result = serp.run(query)
    print("实时搜索结果:", result)
    return result

tools = [search]

template='''
Respond to the human as helpfully and accurately as possible. You have access to the following tools:

{tools}

Use a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input).

Valid "action" values: "Final Answer" or {tool_names}

Provide only ONE action per $JSON_BLOB, as shown:

{{

"action": $TOOL_NAME,

"action_input": $INPUT

}}


Follow this format:

Question: input question to answer

Thought: consider previous and subsequent steps

Action:

$JSON_BLOB


Observation: action result

... (repeat Thought/Action/Observation N times)

Thought: I know what to respond

Action:

{{

"action": "Final Answer",

"action_input": "Final response to human"

}}

Begin! Reminder to ALWAYS respond with a valid json blob of a single action. Use tools if necessary. Respond directly if appropriate. Format is Action:$JSON_BLOBthen Observation
'''
system_message_prompt = SystemMessagePromptTemplate.from_template(template)
human_template='''
{input}

{agent_scratchpad}

(reminder to respond in a JSON blob no matter what)
'''
human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)
prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])

print(prompt)

agent = create_structured_chat_agent(
llm, tools, prompt
)

agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)

async def chat(params):
events = agent_executor.astream_events(params,version="v2")
async for event in events:
type = event['event']
if 'on_chat_model_stream' == type:
data = event['data']
chunk = data['chunk']
content = chunk.content
if content and len(content) > 0:
print(content)

asyncio.run(chat({"input": "广州现在天气如何?"}))


调用结果:

![](https://img-blog.csdnimg.cn/direct/8636945b435344cbbd1ca5330250ed33.png)

说明:

流式输出的数据结构为:

{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='天', id='run-92515b63-4b86-4af8-8515-2f84def9dfab')}, 'run_id': '92515b63-4b86-4af8-8515-2f84def9dfab', 'name': 'ChatOpenAI', 'tags': ['seq:step:3'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 512, 'ls_stop': ['\nObservation']}}
type: on_chat_model_stream
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='气', id='run-92515b63-4b86-4af8-8515-2f84def9dfab')}, 'run_id': '92515b63-4b86-4af8-8515-2f84def9dfab', 'name': 'ChatOpenAI', 'tags': ['seq:step:3'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 512, 'ls_stop': ['\nObservation']}}


### 4.2. 通过langchainhub使用公共prompt

   在4.1使用Tool&流式输出的代码基础上进行调整

-- coding: utf-8 --

import asyncio
import os
from langchain.agents import create_structured_chat_agent, AgentExecutor
from langchain_community.utilities.serpapi import SerpAPIWrapper
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI

os.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

from langchain import hub

llm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0,max_tokens=512)

@tool
def search(query:str):
"""只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""
serp = SerpAPIWrapper()
result = serp.run(query)
print("实时搜索结果:", result)
return result

tools = [search]

prompt = hub.pull("hwchase17/structured-chat-agent")

print(prompt)

agent = create_structured_chat_agent(
llm, tools, prompt
)

agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)

async def chat(params):
events = agent_executor.astream_events(params,version="v2")
async for event in events:
type = event['event']
if 'on_chat_model_stream' == type:
data = event['data']
chunk = data['chunk']
content = chunk.content
if content and len(content) > 0:
print(content)

asyncio.run(chat({"input": "广州现在天气如何?"}))


调用结果:

![](https://img-blog.csdnimg.cn/direct/a24d626f9a52431f84d53448a8d8dbff.png)

### 4.3. 整合代码

在开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)的代码基础上进行调整

import uvicorn
import os

from typing import Annotated
from fastapi import (
Depends,
FastAPI,
WebSocket,
WebSocketException,
WebSocketDisconnect,
status,
)
from langchain import hub
from langchain.agents import create_structured_chat_agent, AgentExecutor
from langchain_community.utilities import SerpAPIWrapper

from langchain_core.tools import tool
from langchain_openai import ChatOpenAI

os.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

class ConnectionManager:
def init(self):
self.active_connections: list[WebSocket] = []

async def connect(self, websocket: WebSocket):
    await websocket.accept()
    self.active_connections.append(websocket)

def disconnect(self, websocket: WebSocket):
    self.active_connections.remove(websocket)

async def send_personal_message(self, message: str, websocket: WebSocket):
    await websocket.send_text(message)

async def broadcast(self, message: str):
    for connection in self.active_connections:
        await connection.send_text(message)

manager = ConnectionManager()

app = FastAPI()

async def authenticate(
websocket: WebSocket,
userid: str,
secret: str,
):
if userid is None or secret is None:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)

print(f'userid: {userid},secret: {secret}')
if '12345' == userid and 'xxxxxxxxxxxxxxxxxxxxxxxxxx' == secret:
    return 'pass'
else:
    return 'fail'

@tool
def search(query:str):
"""只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""
serp = SerpAPIWrapper()
result = serp.run(query)
print("实时搜索结果:", result)
return result

def get_prompt():
prompt = hub.pull("hwchase17/structured-chat-agent")

return prompt

async def chat(query):
global llm,tools
agent = create_structured_chat_agent(
llm, tools, get_prompt()
)

agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)

events = agent_executor.astream_events({"input": query}, version="v1")
async for event in events:
    type = event['event']
    if 'on_chat_model_stream' == type:
        data = event['data']
        chunk = data['chunk']
        content = chunk.content
        if content and len(content) > 0:
            print(content)
            yield content

@app.websocket("/ws")
async def websocket_endpoint(*,websocket: WebSocket,userid: str,permission: Annotated[str, Depends(authenticate)],):
await manager.connect(websocket)
try:
while True:
text = await websocket.receive_text()

        if 'fail' == permission:
            await manager.send_personal_message(
                f"authentication failed", websocket
            )
        else:
            if text is not None and len(text) > 0:
                async for msg in chat(text):
                    await manager.send_personal_message(msg, websocket)

except WebSocketDisconnect:
    manager.disconnect(websocket)
    print(f"Client #{userid} left the chat")
    await manager.broadcast(f"Client #{userid} left the chat")

if name == 'main':
tools = [search]
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, max_tokens=512)
uvicorn.run(app, host='0.0.0.0',port=7777)


客户端:
<!DOCTYPE html> <html> <head> <title>Chat</title> </head> <body>

WebSocket Chat

<form action="" onsubmit="sendMessage(event)"> <label>USERID: <input type="text" id="userid" autocomplete="off" value="12345"/></label> <label>SECRET: <input type="text" id="secret" autocomplete="off" value="xxxxxxxxxxxxxxxxxxxxxxxxxx"/></label>
<button onclick="connect(event)">Connect</button>
<label>Message: <input type="text" id="messageText" autocomplete="off"/></label> <button>Send</button> </form>
<script> var ws = null; function connect(event) { var userid = document.getElementById("userid") var secret = document.getElementById("secret") ws = new WebSocket("ws://localhost:7777/ws?userid="+userid.value+"&secret=" + secret.value); ws.onmessage = function(event) { var messages = document.getElementById('messages') var message = document.createElement('li') var content = document.createTextNode(event.data) message.appendChild(content) messages.appendChild(message) }; event.preventDefault() } function sendMessage(event) { var input = document.getElementById("messageText") ws.send(input.value) input.value = '' event.preventDefault() } </script> </body> </html> ```

调用结果:

用户输入:你好

​​

不需要触发工具调用

模型输出:

用户输入:广州现在天气如何?

需要调用工具

模型输出:


Action:

{
  "action": "Final Answer",
  "action_input": "广州现在的天气是多云,温度为87华氏度,降水概率为7%,湿度为76%,风力为7英里/小时。"
}

```

PS:

  1. 上面仅用于演示流式输出的效果,里面包含一些冗余的信息,例如:"action": "Final Answer",要根据实际情况过滤。

  2. 页面输出的样式可以根据实际需要进行调整,此处仅用于演示效果。


本文转载自: https://blog.csdn.net/qq839019311/article/details/140129692
版权归原作者 开源技术探险家 所有, 如有侵权,请联系我们删除。

“开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(六)”的评论:

还没有评论