本文会带你从零搭建一个完整的概念验证项目(POC),技术栈涵盖 Adaptive RAG、LangGraph、FastAPI 和 Streamlit 四个核心组件。Adaptive RAG 负责根据查询复杂度自动调整检索策略;LangGraph 把多步 LLM 推理组织成有状态的可靠工作流;FastAPI 作为高性能后端暴露整条 AI 管道;Streamlit 则提供一个可以直接交互的前端界面。
读完这篇文章,你拿到的不只是理论——而是一个跑得起来的端到端 AI 系统。
要构建的是一个技术支持智能助手。它能理解用户查询,根据问题复杂度动态选择检索深度(Adaptive RAG),通过 LangGraph 执行推理工作流,经由 FastAPI 返回结果,最后在 Streamlit UI 上呈现响应。
这个场景针对的是一个真实痛点:团队面对大规模文档集时,传统 RAG 在模糊查询或多步骤问题上经常答非所问。
技术概览
Adaptive RAG
可以把 Adaptive RAG 理解为"搜索之前先思考"的 RAG。简单查询走轻量级检索就够了,遇到复杂问题则自动切换到多跳深度搜索、重排序或查询扩展,用更低的延迟换更高的准确率。
LangGraph
LangGraph 是用来构建有状态、多步骤 AI 工作流的框架。和传统链式调用不同它把 LLM 工作流建模成一张图——每个节点对应一个步骤(检索 → 推理 → 验证 → 响应),原生支持重试、记忆、循环和故障转移。对于需要在生产环境中保证可预测行为的场景,这种抽象比线性 chain 灵活得多。
FastAPI
FastAPI 把 Adaptive RAG + LangGraph 包装成 API 接口对外暴露,处理请求分发,天然适配异步 I/O。
Streamlit
前端用 Streamlit 搭建,聊天风格的界面,不需要写 HTML/CSS做 POC 演示足够了。
系统架构

数据流走向:
User → Query → Streamlit UI
Streamlit → Sends request → FastAPI
FastAPI → Passes query → LangGraph
LangGraph → Runs Adaptive RAG → Retriever
Retriever → Gets chunks → Vector DB
Vector DB → Returns results → LangGraph
LangGraph → Generates final response
FastAPI → Sends to UI → User
文件夹结构
项目结构尽量精简:
ai-poc/
│
├── backend/ # 后端逻辑
│ ├── app.py # FastAPI API 服务器
│ ├── rag_pipeline.py # Adaptive RAG 检索
│ ├── graph_workflow.py # LangGraph 工作流
│ ├── config.py # 配置和环境设置
│ ├── data/ # 源文档
│ └── __init__.py # 包初始化器
│
├── frontend/ # UI 层
│ ├── ui.py # Streamlit 界面
│ └── __init__.py # 包初始化器
│
├── .env # API 密钥和机密信息
├── requirements.txt # 项目依赖
└── README.md # 设置说明
requirements.txt 文件
fastapi
uvicorn[standard]
streamlit
requests
pydantic
langchain
langchain-community
langgraph
faiss-cpu
sentence-transformers
openai
python-dotenv
代码实现(关键代码片段)
Adaptive RAG 管道(rag_pipeline.py)
# backend/rag_pipeline.py
from typing import List
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
class AdaptiveRAG:
"""
Adaptive Retrieval Pipeline
"""
def __init__(self, vector_db: FAISS):
self.db = vector_db
def retrieve(self, query: str) -> List[Document]:
if not query.strip():
return []
# Adaptive heuristic
token_count = len(query.split())
k = 3 if token_count < 6 else 8
return self.db.similarity_search(query, k=k)
def build_vector_store(texts: List[str]) -> FAISS:
"""
Build FAISS index from raw texts (POC only).
In production load persisted DB instead.
"""
embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=100
)
docs = []
for text in texts:
chunks = splitter.split_text(text)
for chunk in chunks:
docs.append(chunk)
return FAISS.from_texts(docs, embeddings)
自适应的核心逻辑其实很简单:根据查询的 Token 数决定检索深度。查询短于 6 个词就取 3 条结果,否则拉 8 条。这是一个粗粒度的启发式方法,在 POC 阶段够用,生产环境可以替换成更精细的分类器。
build_vector_store
函数从原始文本构建 FAISS 索引。注意这里每次启动都重建索引,生产上应该加载持久化的数据库。
LangGraph 工作流(graph_workflow.py)
# backend/graph_workflow.py
from typing import TypedDict, List
from langgraph.graph import StateGraph, END
from langchain.schema import Document
from langchain_openai import ChatOpenAI
class GraphState(TypedDict):
question: str
docs: List[Document]
answer: str
def create_workflow(rag):
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
workflow = StateGraph(GraphState)
# Retrieval Node
async def retrieve_node(state: GraphState):
docs = rag.retrieve(state["question"])
return {"docs": docs}
# Reasoning Node
async def reasoning_node(state: GraphState):
question = state["question"]
docs = state.get("docs", [])
context = "\n\n".join([d.page_content for d in docs])
prompt = f"""
You are a technical assistant.
Use ONLY the context below to answer the question.
If the answer is not in the context, say you don't know.
Context:
{context}
Question:
{question}
"""
response = await llm.ainvoke(prompt)
return {"answer": response.content}
# Add nodes
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("reason", reasoning_node)
# Connect nodes
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "reason")
workflow.add_edge("reason", END)
return workflow.compile()
整个工作流只有两个节点:retrieve 负责检索,reason 负责推理生成答案。GraphState 作为 TypedDict 在节点间传递状态。流程很线性——先检索再推理,然后结束。实际项目中可以在这个图上加验证节点、循环重试等分支,LangGraph 的图结构天然支持这种扩展。
FastAPI 后端(app.py)
# backend/app.py
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from dotenv import load_dotenv
from rag_pipeline import AdaptiveRAG, build_vector_store
from graph_workflow import create_workflow
load_dotenv()
app = FastAPI(title="Adaptive RAG API")
# ---------------------------
# Startup Initialization
# ---------------------------
class AskRequest(BaseModel):
query: str
@app.on_event("startup")
async def startup_event():
global workflow
# Sample knowledge base (replace with real docs)
sample_docs = [
"LangGraph supports stateful workflows and retry logic.",
"Adaptive RAG dynamically changes retrieval depth based on query complexity.",
"FastAPI is a high-performance async Python framework.",
]
vector_db = build_vector_store(sample_docs)
rag = AdaptiveRAG(vector_db)
workflow = create_workflow(rag)
# ---------------------------
# API Endpoint
# ---------------------------
@app.post("/ask")
async def ask(payload: AskRequest):
if not payload.query.strip():
raise HTTPException(status_code=400, detail="Query cannot be empty")
try:
result = await workflow.ainvoke(
{"question": payload.query}
)
return {"response": result["answer"]}
except Exception as e:
raise HTTPException(
status_code=500,
detail="Internal RAG processing error"
)
后端在启动时完成向量库构建和工作流初始化,之后通过 /ask 端点接收查询请求。这里用了 global 变量来持有 workflow 实例——POC 阶段这样做没问题,上生产建议用依赖注入替代。
Streamlit UI(ui.py)
# frontend/ui.py
import streamlit as st
import requests
API_URL = "http://localhost:8000/ask"
st.set_page_config(page_title="Adaptive RAG Assistant")
st.title("Adaptive RAG Support Assistant")
query = st.text_input("Enter your question")
if st.button("Ask"):
if not query.strip():
st.warning("Please enter a question.")
else:
try:
with st.spinner("Thinking..."):
response = requests.post(
API_URL,
json={"query": query},
timeout=60
)
response.raise_for_status()
answer = response.json()["response"]
st.markdown("### Answer:")
st.write(answer)
except Exception as e:
st.error(f"Error: {e}")
前端就这么几行:输入框接收问题,按钮触发请求,拿到结果直接渲染。Streamlit 的好处就是不用折腾前端那套东西,做 POC 验证概念足够。
运行项目
安装依赖:
pip install -r requirements.txt
设置 OpenAI Key:
export OPENAI_API_KEY="your_key_here"
Or(For Windows)
setx OPENAI_API_KEY "your_key_here"
启动后端:
uvicorn backend.app:app --reload
启动前端:
streamlit run frontend/ui.py
内部执行流程
在 UI 中输入这样一条查询
How does retry logic work in LangGraph workflows?
请求先到达 FastAPI 后端。LangGraph 工作流从 retrieve 节点启动,Adaptive RAG 根据查询长度动态选定检索深度——短查询取
k=3
长查询取
k=8
。从向量数据库拉到相关文档块后,reasoning 节点把这些上下文拼装成 prompt,交给 LLM 生成答案。LLM 的回答被限定在检索到的上下文范围内,最终结果沿原路返回到 UI。
一切正常的话,你现在手上就有了一条完整的端到端 RAG 管道:UI → API → Graph → Retriever → LLM → Response。
下一步:生产部署
POC 跑通了但离生产还有距离。下面按模块列一下需要补强的方向。
检索层
向量相似度搜索可以跟 BM25 关键词搜索做混合,在一些 edge case 上召回率会好很多。检索完 top-k 文档后,再套一层 Cross-Encoder 做重排序,排序精度能上一个台阶。如果系统要支持多团队或多租户,还得引入基于命名空间的文档隔离,防止跨域信息泄漏。
工作流
当前工作流里没有验证环节。生产环境建议加一个验证节点,检查生成的答案是否真的有检索上下文支撑——这对控制幻觉至关重要。另外,如果要做多轮对话,就需要往图里加记忆节点来持久化对话状态。
重试与回退
LLM 调用失败后要有重试机制。主模型不可用时能自动降级到更小或更便宜的备选模型。超时控制和优雅降级也不能少。
成本控制也值得考虑:简单查询走轻量模型,只在必要时才升级到大模型。
可观测性与评估
日志要记全——检索分数、命中的文档、响应延迟、Token 消耗,这些都得有。定期做离线评估,准备好测试数据集,跑检索质量和回答质量的指标。幻觉监控单独拎出来盯,追踪那些答案脱离检索上下文的 case。
UI 改进
聊天界面得支持历史记录和多轮对话。回答来源要做高亮——用户应该能看到答案是从哪几段文档生成的。再加上反馈按钮,让用户对回答打分,收集回来的数据可以用于后续评估和微调。
部署与基础设施
前后端都做 Docker 容器化,保证部署的可复现性。上云的话 AWS、GCP、Azure 都行,记得配自动扩缩容。端点要上 HTTPS 和 Token 认证(JWT 或 OAuth)。生产环境的 API Key 不要再靠环境变量了,换托管的密钥存储服务。
总结
一个模块化、可扩展的 RAG 架构就搭建完成了,它完全可以从当前的 POC 状态逐步演化成生产级系统。自适应检索、有状态编排、可扩展 API、简洁的交互界面——这几个构建块拼在一起,基本覆盖了现代 LLM 应用的核心架构需求。
by Robi Kumar Tomar