0


LangGraph 入门:用图结构构建你的第一个多智能体工作流

LangGraph 设计的一个核心是:多智能体工作流本质上是图结构,而非线性链。早期 LLM 应用普遍采用"提示 → LLM → 响应"的线性模式,但这种架构难以应对真实智能体系统的复杂性。比如生产环境中的多智能体协作需要分支(基于数据选择不同执行路径)、循环(支持重试与迭代优化)、汇合(多个智能体向共享状态写入数据),以及条件路由(根据执行结果动态决定后续流程)。

LangGraph 如何表示工作流

LangGraph 里每个工作流都是一个 StateGraph——本质上是有向图。节点就是智能体,或者说处理状态的函数;边是智能体之间的转换;状态则是在整个图中流动的共享数据结构。

 from langgraph.graph import StateGraph, END  
from typing import TypedDict

# Define your state schema  
class IncidentState(TypedDict):  
    incident_id: str  
    current_metrics: dict  
    proposed_solution: dict  
    issue_resolved: bool  
    retry_count: int

# Create the graph  
workflow = StateGraph(IncidentState)

# Add agent nodes  
workflow.add_node("diagnose", diagnose_agent)  
workflow.add_node("plan_fix", planning_agent)  
workflow.add_node("execute_fix", worker_agent)  
workflow.add_node("verify", verification_agent)

# Define transitions  
workflow.add_edge("diagnose", "plan_fix")  
workflow.add_edge("plan_fix", "execute_fix")  
workflow.add_edge("execute_fix", "verify")

# Conditional: retry or exit  
workflow.add_conditional_edges(  
    "verify",  
    lambda state: "resolved" if state["issue_resolved"] else "retry",  
    {  
        "resolved": END,  
        "retry": "diagnose"  # Loop back  
    }  
)

 workflow.set_entry_point("diagnose")

这样做的好处非常明显:图本身就可以当作开发文档文档,一眼能看懂流程;加减节点不用动协调逻辑;状态有类型约束;循环有内置的终止条件,不会跑成死循环。

节点、边、状态三者各司其职。节点封装具体的逻辑操作,只管做事;边定义节点间怎么交互、谁先谁后;状态承载共享上下文,让节点可以保持无状态。这种职责分离让系统好理解、好调试、好扩展,节点还能跨工作流复用。

运行时到底发生了什么

图定义是声明式的,但真正让编排变得有意义的是运行时行为。

工作流启动后,LangGraph 用状态机来管理执行。首先从入口节点的初始状态开始,然后调用智能体函数并传入当前状态。智能体返回的是增量更新而不是整个状态的替换,LangGraph 拿到更新后原子性地合并到当前状态,接着根据图定义决定下一个节点,同时创建检查点把当前状态和执行位置持久化下来。这个过程一直重复,直到走到 END 节点或者达到最大迭代次数。

有一点很关键:智能体永远不会直接改共享状态。它们拿到的是只读副本,算完之后返回更新,实际的状态修改由 LangGraph 来做,可以保证了原子性和一致性。

边遍历机制

边定义了哪些转换是允许的,但具体什么时候转换由运行时决定。

静态边没什么花样:

 workflow.add_edge("diagnose", "plan_fix")

diagnose 节点跑完、检查点创建好之后,LangGraph 立刻拿更新后的状态去调 plan_fix。

条件边就灵活多了:

 workflow.add_conditional_edges(  
     "verify",  
     route_function,  
     {"retry": "diagnose", "resolved": END}  
 )

verify 完成后,LangGraph 调用 route_function(state) 来判断下一步走哪条边。函数返回 retry 就回到 diagnose,返回 resolved 就结束。

任何节点在执行前它的所有前置节点必须已经完成并创建了检查点,这就避免了 Pub/Sub 系统里常见的那种"前面还没跑完后面就开始了"的问题。

状态管理的特殊之处

LangGraph 的状态跟传统系统不太一样。

它不是存在 Redis 或数据库里让智能体直接访问的共享内存。LangGraph 在内部维护状态,给智能体的是受控访问。对智能体来说状态是不可变的——拿到的是快照,不能直接改,只能返回想要的变更。

多个智能体并行跑的时候(通过并行边),LangGraph 收集所有更新,用 reducer 原子性地一起应用。读-修改-写的竞态条件就这么解决了。

每个检查点还会创建一个状态版本。想看执行历史中任意时刻的状态?直接查检查点就行,这就是所谓的时间旅行调试。

检查点持久化

检查点不只是日志,它们是恢复点。

每个检查点记录完整的状态快照、当前在图中的位置(刚执行完哪个节点)、还有元数据(时间戳、创建检查点的节点、执行路径)。

创建时机有三个:每个节点成功完成后、条件边评估前、以及工作流暂停时(比如等人工审批)。

这样如果节点执行到一半崩了,可以从最后一个检查点重试就行;长时间运行的工作流可以暂停再恢复,进度不会丢;调试的时候能从任意检查点开始重放。

一个完整的运行时示例

假设用户发起请求:"修复服务延迟问题"。

 T0: Workflow starts  
    - Initial state: {incident_id: "INC-123", retry_count: 0}  
    - Entry point: "diagnose"

T1: "diagnose" node executes  
    - Receives: {incident_id: "INC-123", retry_count: 0}  
    - Agent calls Data Agent, fetches metrics  
    - Returns: {current_metrics: {cpu: 95, latency: 500ms}}  
    - LangGraph merges: state now has metrics  
    - Checkpoint created  
      
T2: Static edge triggers: "diagnose" → "plan_fix"  
    - "plan_fix" node executes  
    - Receives merged state (incident_id + retry_count + current_metrics)  
    - Agent calls Knowledge Agent for runbook  
    - Returns: {proposed_solution: "restart_service"}  
    - LangGraph merges  
    - Checkpoint created

T3: Static edge triggers: "plan_fix" → "execute_fix"  
    - "execute_fix" node executes  
    - Calls Worker Agent  
    - Returns: {action_status: "completed"}  
    - Checkpoint created

T4: Static edge triggers: "execute_fix" → "verify"  
    - "verify" node executes  
    - Calls Data Agent again  
    - Returns: {current_metrics: {cpu: 90, latency: 480ms}, issue_resolved: false}  
    - Checkpoint created

T5: Conditional edge evaluation  
    - LangGraph calls route function with current state  
    - route_function checks: state["issue_resolved"] == false and retry_count < 3  
    - Returns: "retry"  
    - LangGraph increments retry_count  
    - Routes back to "diagnose" (cycle)

T6: "diagnose" executes again (retry [#1](#1))  
     - Process repeats with updated state...

状态在节点间累积——指标、方案、操作结果都在里面。每个节点都能看到之前所有节点产出的完整信息。重试逻辑是图结构强制的,不是写在智能体代码里。出了故障检查点可以让程序随时恢复运行。

用 LangGraph 的话,智能体只管返回自己的更新。协调、状态合并、路由、持久化,运行时全包了。

关键架构模式

传统多智能体系统喜欢累积对话历史:

 # Common pattern - append-only log  
 messages= [  
     {"role": "user", "content": "Service X is slow"},  
     {"role": "data", "content": "CPU at 95%"},  
     {"role": "knowledge", "content": "Try restarting"},  
     {"role": "action", "content": "Restarted service"},  
     ...  
 ]

这东西会无限增长,智能体每次都得在历史里翻来翻去找有用的数据。

LangGraph 换了个思路,状态就是当前世界的快照:

 classState(TypedDict):  
     # Current values, not history  
     incident_id: str  
     current_cpu: float  
     recommended_action: str  
     action_status: str  
     retry_count: int

智能体读当前值、更新当前值。历史通过检查点单独维护,调试用得着,但工作状态保持精简。访问状态 O(1),不用解析历史;数据所有权清晰,一眼看出哪个字段归谁管;推理也简单,当前状态是啥就是啥。

Reducer 解决并行协调

多个智能体要往同一个状态字段写数据怎么办?LangGraph 提供 reducer——专门合并并发更新的函数。

传统 A2A 模型里,智能体得自己搞协调:抢锁、读-修改-写、重试、冲突检测。这套东西各团队实现得五花八门,一旦出现部分故障就容易出问题。Reducer 把冲突解决挪到编排层,智能体级别的协调逻辑直接省掉。

比如说下面的例子,三个监控智能体并行检查不同的服务副本:

 fromtypingimportAnnotated  
 fromoperatorimportadd
 
 classState(TypedDict):  
     # Reducer: combine all health check results  
     health_checks: Annotated[list, add]

三个 Data Agent 各自返回健康检查结果,reducer(这里就是列表的 add 操作)自动把三份结果合成一个列表。没有智能体需要知道其他智能体的存在,不用抢锁,不用协调更新。

没有 reducer 的话,需要手动加锁防覆盖、写协调逻辑合并结果、还得担心更新丢失。有了 reducer,编排层自动处理。

检查点用于调试和恢复

每次节点执行都会创建检查点,状态和执行位置的快照会持久化到 Postgres、Redis 或文件系统。

生产环境出故障了?可以检查检查点的内容,看看每个智能体观察到了什么、做了什么决定。这相当于给智能体工作流装了黑匣子,决策链条一清二楚。

服务器中途崩了也可以从最后一个检查点恢复,不用从头来。对那些要调用昂贵 API 或者收集大量数据的长时间任务来说,这太重要了。

而且工作流可以暂停几小时甚至几天,状态通过检查点保持现有状态,从暂停的地方精确恢复,上下文完整保留。

修改工作流的灵活性

LangGraph的另外一个卖点是工作流改起来容易。

假设初始工作流是 Diagnose → Fix → Verify,现在要加个需求:"修复之前先查一下 Jira 有没有已知问题"。

代码改动就这么点:

 # Add the new agent  
workflow.add_node("check_jira", jira_agent)

# Rewire the flow  
workflow.add_edge("diagnose", "check_jira")  # New path  
workflow.add_conditional_edges(  
    "check_jira",  
    lambda state: "known_issue" if state["jira_ticket"] else "unknown",  
    {  
        "known_issue": "apply_known_fix",  # New path  
        "unknown": "plan_fix"              # Original path  
    }  
 )

单个智能体的实现不用动,状态协调逻辑不用动,检查点处理不用动,错误恢复不用动。

如果换成换成 Pub/Sub 呢?事件路由逻辑要改,完成跟踪要改(现在是 4 个智能体不是 3 个了),状态模式协调要改,所有集成点都得重新测。

再看重试逻辑的修改。原来是最多重试 3 次:

 # Before  
 workflow.add_conditional_edges(  
     "verify",  
     lambda state: "retry" if state["retry_count"] < 3 else "end",  
     {"retry": "diagnose", "end": END}  
 )

新需求:"只有临时性错误(网络问题)才重试,永久性错误(配置问题)不重试"。改条件函数就行:

 # After - just change the condition function  
def should_retry(state):  
    if state["issue_resolved"]:  
        return "success"  
    if state["error_type"] == "config":  
        return "escalate"  # Don't retry config errors  
    if state["retry_count"] >= 3:  
        return "max_retries"  
    return "retry"

workflow.add_conditional_edges(  
    "verify",  
    should_retry,  
    {  
        "success": END,  
        "retry": "diagnose",  
        "escalate": "human_review",  
        "max_retries": "alert_team"  
    }  
 )

业务逻辑在工作流结构里一目了然,改起来也顺手。

LangGraph 支持的典型模式

生成的方案不够好,可以直接加个循环:

 workflow.add_node("generate_solution", llm_agent)  
workflow.add_node("validate_solution", validation_agent)  
workflow.add_node("refine_solution", refinement_agent)

workflow.add_conditional_edges(  
    "validate_solution",  
    lambdastate: "valid"ifstate["solution_quality"] >0.8else"refine",  
    {  
        "valid": "execute_fix",  
        "refine": "refine_solution"  
    }  
)

 workflow.add_edge("refine_solution", "generate_solution")  # Loop back

方案不断迭代,直到质量达标。

并行信息收集时需要同时从多个来源拉数据:

 fromlanggraph.graphimportSTART

# Parallel nodes  
workflow.add_node("fetch_metrics", data_agent)  
workflow.add_node("fetch_logs", elasticsearch_agent)  
workflow.add_node("fetch_config", knowledge_agent)

# All start in parallel  
workflow.add_edge(START, "fetch_metrics")  
workflow.add_edge(START, "fetch_logs")  
workflow.add_edge(START, "fetch_config")

# All must complete before analysis  
workflow.add_node("analyze", analysis_agent)  
workflow.add_edge("fetch_metrics", "analyze")  
workflow.add_edge("fetch_logs", "analyze")  
 workflow.add_edge("fetch_config", "analyze")

LangGraph 保证 analyze 节点在三个数据源都拿完之后才开始跑。

高风险操作需要人来进行确认:

 workflow.add_node("propose_fix", planning_agent)  
workflow.add_node("await_approval", approval_gate)  
workflow.add_node("execute_fix", action_agent)

workflow.add_edge("propose_fix", "await_approval")

# Workflow pauses at await_approval  
# State is persisted  
# When human approves, workflow resumes

workflow.add_conditional_edges(  
    "await_approval",  
    lambdastate: "approved"ifstate["human_approved"] else"rejected",  
    {  
        "approved": "execute_fix",  
        "rejected": "propose_alternative"  
    }  
 )

这个确认过程可以等几小时甚至几天,不消耗任何的资源。

什么场景适合 LangGraph

复杂工作流(5 个以上智能体、有条件逻辑、有循环)、业务逻辑经常变、需要事后调试分析、有人工审批或质量门控、长时间任务需要崩溃恢复——这些场景 LangGraph 很合适。

简单的线性流程(A → B → C,没分支)、智能体完全独立不需要协调、对延迟极度敏感(编排开销要控制在 10ms 以内)、或者团队有深厚的分布式系统功底想自己搞状态机——这些场景替代方案也挺好。

总结

编排框架在复杂系统中的价值已经被反复验证:Kubernetes 之于容器、Airflow 之于数据管道、Temporal 之于通用工作流。LangGraph 将同样的理念带入多智能体 AI 领域,提供了 LLM 感知的编排能力。

其核心价值在于:图结构让工作流易于修改和扩展,检查点机制保障了可调试性和故障恢复,reducer 和原子状态更新解决了并行协调难题。开发者可以专注于智能体逻辑本身,而非协调管道的实现细节。

对于正在构建多智能体系统的团队,LangGraph 提供了一条从实验原型到生产系统的可行路径。

作者:ravikiran veldanda

记录[+]

2 小时前 已修改
2 小时前 已修改

“LangGraph 入门:用图结构构建你的第一个多智能体工作流”的评论:

还没有评论