0


手把手搭建 Adaptive RAG 系统:从向量检索到 Streamlit 前端全流程

本文会带你从零搭建一个完整的概念验证项目(POC),技术栈涵盖 Adaptive RAG、LangGraph、FastAPI 和 Streamlit 四个核心组件。Adaptive RAG 负责根据查询复杂度自动调整检索策略;LangGraph 把多步 LLM 推理组织成有状态的可靠工作流;FastAPI 作为高性能后端暴露整条 AI 管道;Streamlit 则提供一个可以直接交互的前端界面。

读完这篇文章,你拿到的不只是理论——而是一个跑得起来的端到端 AI 系统。

要构建的是一个技术支持智能助手。它能理解用户查询,根据问题复杂度动态选择检索深度(Adaptive RAG),通过 LangGraph 执行推理工作流,经由 FastAPI 返回结果,最后在 Streamlit UI 上呈现响应。

这个场景针对的是一个真实痛点:团队面对大规模文档集时,传统 RAG 在模糊查询或多步骤问题上经常答非所问。

技术概览

Adaptive RAG

可以把 Adaptive RAG 理解为"搜索之前先思考"的 RAG。简单查询走轻量级检索就够了,遇到复杂问题则自动切换到多跳深度搜索、重排序或查询扩展,用更低的延迟换更高的准确率。

LangGraph

LangGraph 是用来构建有状态、多步骤 AI 工作流的框架。和传统链式调用不同它把 LLM 工作流建模成一张图——每个节点对应一个步骤(检索 → 推理 → 验证 → 响应),原生支持重试、记忆、循环和故障转移。对于需要在生产环境中保证可预测行为的场景,这种抽象比线性 chain 灵活得多。

FastAPI

FastAPI 把 Adaptive RAG + LangGraph 包装成 API 接口对外暴露,处理请求分发,天然适配异步 I/O。

Streamlit

前端用 Streamlit 搭建,聊天风格的界面,不需要写 HTML/CSS做 POC 演示足够了。

系统架构

数据流走向:

 User → Query → Streamlit UI  
 Streamlit → Sends request → FastAPI  
 FastAPI → Passes query → LangGraph  
 LangGraph → Runs Adaptive RAG → Retriever  
 Retriever → Gets chunks → Vector DB  
 Vector DB → Returns results → LangGraph  
 LangGraph → Generates final response  
 FastAPI → Sends to UI → User

文件夹结构

项目结构尽量精简:

 ai-poc/  
│  
├── backend/                      # 后端逻辑  
│   ├── app.py                    # FastAPI API 服务器  
│   ├── rag_pipeline.py           # Adaptive RAG 检索  
│   ├── graph_workflow.py         # LangGraph 工作流  
│   ├── config.py                 # 配置和环境设置  
│   ├── data/                     # 源文档  
│   └── __init__.py               # 包初始化器  
│  
├── frontend/                     # UI 层  
│   ├── ui.py                     # Streamlit 界面  
│   └── __init__.py               # 包初始化器  
│  
├── .env                          # API 密钥和机密信息  
├── requirements.txt              # 项目依赖  
 └── README.md                     # 设置说明

requirements.txt 文件

 fastapi  
uvicorn[standard]  
streamlit  
requests  
pydantic  
langchain  
langchain-community  
langgraph  
faiss-cpu  
sentence-transformers  
openai  
 python-dotenv

代码实现(关键代码片段)

Adaptive RAG 管道(rag_pipeline.py)

 # backend/rag_pipeline.py  

from typing import List  
from langchain_community.vectorstores import FAISS  
from langchain_community.embeddings import HuggingFaceEmbeddings  
from langchain.text_splitter import RecursiveCharacterTextSplitter  
from langchain.schema import Document  

class AdaptiveRAG:  
    """  
    Adaptive Retrieval Pipeline  
    """  

    def __init__(self, vector_db: FAISS):  
        self.db = vector_db  

    def retrieve(self, query: str) -> List[Document]:  
        if not query.strip():  
            return []  

        # Adaptive heuristic  
        token_count = len(query.split())  
        k = 3 if token_count < 6 else 8  

        return self.db.similarity_search(query, k=k)  

def build_vector_store(texts: List[str]) -> FAISS:  
    """  
    Build FAISS index from raw texts (POC only).  
    In production load persisted DB instead.  
    """  

    embeddings = HuggingFaceEmbeddings(  
        model_name="sentence-transformers/all-MiniLM-L6-v2"  
    )  

    splitter = RecursiveCharacterTextSplitter(  
        chunk_size=1000,  
        chunk_overlap=100  
    )  

    docs = []  
    for text in texts:  
        chunks = splitter.split_text(text)  
        for chunk in chunks:  
            docs.append(chunk)  

     return FAISS.from_texts(docs, embeddings)

自适应的核心逻辑其实很简单:根据查询的 Token 数决定检索深度。查询短于 6 个词就取 3 条结果,否则拉 8 条。这是一个粗粒度的启发式方法,在 POC 阶段够用,生产环境可以替换成更精细的分类器。

build_vector_store

函数从原始文本构建 FAISS 索引。注意这里每次启动都重建索引,生产上应该加载持久化的数据库。

LangGraph 工作流(graph_workflow.py)

 # backend/graph_workflow.py  

from typing import TypedDict, List  
from langgraph.graph import StateGraph, END  
from langchain.schema import Document  
from langchain_openai import ChatOpenAI  

class GraphState(TypedDict):  
    question: str  
    docs: List[Document]  
    answer: str  

def create_workflow(rag):  

    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)  

    workflow = StateGraph(GraphState)  

    # Retrieval Node  
    async def retrieve_node(state: GraphState):  
        docs = rag.retrieve(state["question"])  
        return {"docs": docs}  

    # Reasoning Node  
    async def reasoning_node(state: GraphState):  
        question = state["question"]  
        docs = state.get("docs", [])  

        context = "\n\n".join([d.page_content for d in docs])  

        prompt = f"""  
        You are a technical assistant.  

        Use ONLY the context below to answer the question.  
        If the answer is not in the context, say you don't know.  

        Context:  
        {context}  

        Question:  
        {question}  
        """  

        response = await llm.ainvoke(prompt)  

        return {"answer": response.content}  

    # Add nodes  
    workflow.add_node("retrieve", retrieve_node)  
    workflow.add_node("reason", reasoning_node)  

    # Connect nodes  
    workflow.set_entry_point("retrieve")  
    workflow.add_edge("retrieve", "reason")  
    workflow.add_edge("reason", END)  

     return workflow.compile()

整个工作流只有两个节点:retrieve 负责检索,reason 负责推理生成答案。GraphState 作为 TypedDict 在节点间传递状态。流程很线性——先检索再推理,然后结束。实际项目中可以在这个图上加验证节点、循环重试等分支,LangGraph 的图结构天然支持这种扩展。

FastAPI 后端(app.py)

 # backend/app.py  

import os  
from fastapi import FastAPI, HTTPException  
from pydantic import BaseModel  
from dotenv import load_dotenv  

from rag_pipeline import AdaptiveRAG, build_vector_store  
from graph_workflow import create_workflow  

load_dotenv()  

app = FastAPI(title="Adaptive RAG API")  

# ---------------------------  
# Startup Initialization  
# ---------------------------  

class AskRequest(BaseModel):  
    query: str  

@app.on_event("startup")  
async def startup_event():  
    global workflow  

    # Sample knowledge base (replace with real docs)  
    sample_docs = [  
        "LangGraph supports stateful workflows and retry logic.",  
        "Adaptive RAG dynamically changes retrieval depth based on query complexity.",  
        "FastAPI is a high-performance async Python framework.",  
    ]  

    vector_db = build_vector_store(sample_docs)  
    rag = AdaptiveRAG(vector_db)  

    workflow = create_workflow(rag)  

# ---------------------------  
# API Endpoint  
# ---------------------------  

@app.post("/ask")  
async def ask(payload: AskRequest):  
    if not payload.query.strip():  
        raise HTTPException(status_code=400, detail="Query cannot be empty")  

    try:  
        result = await workflow.ainvoke(  
            {"question": payload.query}  
        )  

        return {"response": result["answer"]}  

    except Exception as e:  
        raise HTTPException(  
            status_code=500,  
            detail="Internal RAG processing error"  
         )

后端在启动时完成向量库构建和工作流初始化,之后通过 /ask 端点接收查询请求。这里用了 global 变量来持有 workflow 实例——POC 阶段这样做没问题,上生产建议用依赖注入替代。

Streamlit UI(ui.py)

 # frontend/ui.py  

import streamlit as st  
import requests  

API_URL = "http://localhost:8000/ask"  

st.set_page_config(page_title="Adaptive RAG Assistant")  
st.title("Adaptive RAG Support Assistant")  

query = st.text_input("Enter your question")  

if st.button("Ask"):  
    if not query.strip():  
        st.warning("Please enter a question.")  
    else:  
        try:  
            with st.spinner("Thinking..."):  
                response = requests.post(  
                    API_URL,  
                    json={"query": query},  
                    timeout=60  
                )  
                response.raise_for_status()  

            answer = response.json()["response"]  

            st.markdown("### Answer:")  
            st.write(answer)  

        except Exception as e:  
             st.error(f"Error: {e}")

前端就这么几行:输入框接收问题,按钮触发请求,拿到结果直接渲染。Streamlit 的好处就是不用折腾前端那套东西,做 POC 验证概念足够。

运行项目

安装依赖:

 pip install -r requirements.txt

设置 OpenAI Key:

 export OPENAI_API_KEY="your_key_here"  
   
            Or(For Windows)  
   
 setx OPENAI_API_KEY "your_key_here" 

启动后端:

 uvicorn backend.app:app --reload

启动前端:

 streamlit run frontend/ui.py

内部执行流程

在 UI 中输入这样一条查询

 How does retry logic work in LangGraph workflows?

请求先到达 FastAPI 后端。LangGraph 工作流从 retrieve 节点启动,Adaptive RAG 根据查询长度动态选定检索深度——短查询取

k=3

长查询取

k=8

。从向量数据库拉到相关文档块后,reasoning 节点把这些上下文拼装成 prompt,交给 LLM 生成答案。LLM 的回答被限定在检索到的上下文范围内,最终结果沿原路返回到 UI。

一切正常的话,你现在手上就有了一条完整的端到端 RAG 管道:UI → API → Graph → Retriever → LLM → Response。

下一步:生产部署

POC 跑通了但离生产还有距离。下面按模块列一下需要补强的方向。

检索层

向量相似度搜索可以跟 BM25 关键词搜索做混合,在一些 edge case 上召回率会好很多。检索完 top-k 文档后,再套一层 Cross-Encoder 做重排序,排序精度能上一个台阶。如果系统要支持多团队或多租户,还得引入基于命名空间的文档隔离,防止跨域信息泄漏。

工作流

当前工作流里没有验证环节。生产环境建议加一个验证节点,检查生成的答案是否真的有检索上下文支撑——这对控制幻觉至关重要。另外,如果要做多轮对话,就需要往图里加记忆节点来持久化对话状态。

重试与回退

LLM 调用失败后要有重试机制。主模型不可用时能自动降级到更小或更便宜的备选模型。超时控制和优雅降级也不能少。

成本控制也值得考虑:简单查询走轻量模型,只在必要时才升级到大模型。

可观测性与评估

日志要记全——检索分数、命中的文档、响应延迟、Token 消耗,这些都得有。定期做离线评估,准备好测试数据集,跑检索质量和回答质量的指标。幻觉监控单独拎出来盯,追踪那些答案脱离检索上下文的 case。

UI 改进

聊天界面得支持历史记录和多轮对话。回答来源要做高亮——用户应该能看到答案是从哪几段文档生成的。再加上反馈按钮,让用户对回答打分,收集回来的数据可以用于后续评估和微调。

部署与基础设施

前后端都做 Docker 容器化,保证部署的可复现性。上云的话 AWS、GCP、Azure 都行,记得配自动扩缩容。端点要上 HTTPS 和 Token 认证(JWT 或 OAuth)。生产环境的 API Key 不要再靠环境变量了,换托管的密钥存储服务。

总结

一个模块化、可扩展的 RAG 架构就搭建完成了,它完全可以从当前的 POC 状态逐步演化成生产级系统。自适应检索、有状态编排、可扩展 API、简洁的交互界面——这几个构建块拼在一起,基本覆盖了现代 LLM 应用的核心架构需求。

by Robi Kumar Tomar

“手把手搭建 Adaptive RAG 系统:从向量检索到 Streamlit 前端全流程”的评论:

还没有评论