0


LangGraph RemoteGraph:本地图与远程图的组合机制解析

把 AI agent 的逻辑拆分到多个独立运行的服务中,听起来复杂做起来也确实容易乱。LangGraph 的 RemoteGraph 特性算是一个干净的方案:本地编排器负责流程控制,远程图服务器承担具体计算,状态管理和控制流的职责边界清晰。

本文要构建的项目是一个循环数学引擎:本地图编排一个远程图:随机选择数学运算和生成随机数。编排器会以两种方式实现——顺序执行和并行执行——以便对比两者的取舍,方便根据场景选择合适的模式。循环持续运行,直到远程图返回

end

架构概览

一个 math_service 远程图负责两种操作,本地 math_orchestrator 在每次迭代中调用它两次,每种操作各一次。下面分别是顺序版本和并行版本的编排器结构:

顺序流——远程图被依次调用两次:

并行流——远程图用 fan-out/fan-in 模式同时被调用两次:

math_service 是远程图,接受

action

字段:

"pick_operation"

返回一个随机数学运算或

end

"generate_number"

返回一个随机整数。

math_orchestrator 是本地图,接受初始数字后每次迭代调用远程图两次(分别传入不同 action),执行数学运算,operation 为

end

时终止。

环境准备

uv/pyproject.toml 配置如下:

 [project]  
name = "langgraph-random-math"  
version = "0.1.0"  
description = "Add your description here"  
requires-python = "==3.13"  
dependencies = [  
    "langgraph",  
    "langgraph-cli",  
    "langgraph-sdk"  
 ]

截至本文编写时 pydantic 与 Python 3.14 及更高版本不兼容,所以这里用 3.13。

两个图都在本地运行——远程图跑在 LangGraph 开发服务器上,本地图作为普通 Python 脚本执行。

步骤 1:math_service——远程图

远程图用条件路由在单个图中处理两种操作。传入状态的

action

字段决定路由方向:

pick_operation

generate_number

创建远程服务目录结构:

 math_service/  
 ├── auth.py  
 ├── graph.py  
 ├── langgraph.json  
 └── .env

math_service/graph.py

 import random  
 from typing import TypedDict  
 from langgraph.graph import StateGraph, START, END
 class MathServiceState(TypedDict):  
     action: str                # "pick_operation" or "generate_number"  
     operation: str             # 结果: "add", "subtract", "multiply", "divide", 或 "end"  
     number: int                # 结果: 随机整数  
     manual_input_chance: float # 请求用户输入的概率 (0.0-1.0)  
     ask_user: bool             # 结果: True = 编排器应提示用户  
 def route_action(state: MathServiceState) -> str:  
     """根据 action 字段路由到相应的节点。"""  
     if state["action"] == "pick_operation":  
         return "pick_operation"  
     elif state["action"] == "generate_number":  
         return "generate_number"  
     else:  
         raise ValueError(f"Unknown action: {state['action']}")  
 def pick_operation(state: MathServiceState) -> dict:  
     """随机选择一个数学运算或 'end' 来停止循环。"""  
     operations = ["add", "subtract", "multiply", "divide", "end"]  
     # 'end' 有 10% 的概率;剩余 90% 在数学运算之间平均分配  
     weights = [9, 9, 9, 9, 4]  
     chosen = random.choices(operations, weights=weights, k=1)[0]  
     return {"operation": chosen}  
 def generate_number(state: MathServiceState) -> dict:  
    """  
    生成随机数或请求手动用户输入。  

    根据 manual_input_chance 进行掷骰:如果结果低于阈值,  
    返回 ask_user=True(不生成数字——编排器应提示用户)。  
    否则,生成并返回一个随机整数。  
    """  
    chance = state.get("manual_input_chance", 0.0)  

    if chance > 0.0 and random.random() < chance:  
        return {"ask_user": True}  

     return {"number": random.randint(1, 20), "ask_user": False}  
 builder = StateGraph(MathServiceState)  
builder.add_node("pick_operation", pick_operation)  
builder.add_node("generate_number", generate_number)  

# 基于 action 字段从 START 进行条件路由  
builder.add_conditional_edges(  
    START,  
    route_action,  
    {  
        "pick_operation": "pick_operation",  
        "generate_number": "generate_number",  
    },  
)  

builder.add_edge("pick_operation", END)  
builder.add_edge("generate_number", END)  

 graph = builder.compile()

math_service/auth.py

 import os  
from langgraph_sdk import Auth  

auth = Auth()  

# 在生产环境中,请使用正规的 JWT 验证库。  
# 此示例使用简单的 token 查找以保持清晰。  
VALID_TOKENS = {  
    os.environ.get("MATH_SERVICE_TOKEN", "dev-token"): {  
        "id": "orchestrator",  
        "name": "Math Orchestrator",  
    },  
 }  
 @auth.authenticate  
async def authenticate(authorization: str | None) -> Auth.types.MinimalUserDict:  
    """验证 Authorization 头中的 Bearer token。"""  
    if not authorization:  
        raise Auth.exceptions.HTTPException(  
            status_code=401, detail="Missing authorization header"  
        )  

    try:  
        scheme, token = authorization.split(" ", 1)  
    except ValueError:  
        raise Auth.exceptions.HTTPException(  
            status_code=401, detail="Invalid authorization format"  
        )  

    if scheme.lower() != "bearer" or token not in VALID_TOKENS:  
        raise Auth.exceptions.HTTPException(  
            status_code=401, detail="Invalid token"  
        )  

    user = VALID_TOKENS[token]  
    return {  
        "identity": user["id"],  
        "is_authenticated": True,  
     }  
 @auth.on  
async def authorize_all(ctx: Auth.types.AuthContext, value: dict):  
    """允许已认证用户执行所有操作。  

    在更复杂的设置中,你可以通过检查 value 载荷来限制  
    每个调用者可以调用哪些操作(pick_operation vs generate_number)。  
    """  
     return None  # None = 允许,不添加额外过滤

这是一个最小认证层:检查 Bearer token 有效性,对已认证的调用者放行所有操作。生产环境中应当用 JWT 验证(

PyJWT

、Auth0 等)替代 token 查表,并按需增加操作级别的授权。

generate_number

节点内部完成决策——根据

manual_input_chance

掷骰后,要么生成数字,要么置

ask_user=True

。编排器检查这个标志并在需要时于本地提示用户。决策逻辑留在服务端,用户交互留在客户端,正是微服务中典型的职责划分方式。

math_service/langgraph.json:

 {  
  "dependencies": ["."],  
  "graphs": {  
    "math_service": "./graph.py:graph"  
  },  
  "auth": {  
    "path": "./auth.py:auth"  
  },  
  "env": ".env"  
 }

创建

.env

文件写入服务 token:

 MATH_SERVICE_TOKEN=dev-token

在端口 2024 启动服务器:

 cd math_service  
 langgraph dev --port 2024 --no-browser

可以针对运行中的服务器测试两种操作:

 from langgraph.pregel.remote import RemoteGraph  

# 不带 token — 应该返回 401 失败  
try:  
    bad_service = RemoteGraph("math_service", url="http://localhost:2024")  
    bad_service.invoke({  
        "action": "pick_operation", "operation": "", "number": 0,  
        "manual_input_chance": 0.0, "ask_user": False,  
    })  
    print("❌ Should have failed without token!")  
except Exception as e:  
    print(f"✅ Correctly blocked: {e}")  

# 带有效 token — 应该成功  
service = RemoteGraph(  
    "math_service",  
    url="http://localhost:2024",  
    headers={"Authorization": "Bearer dev-token"},  
)  

# 测试: 选择一个运算  
result = service.invoke({  
    "action": "pick_operation", "operation": "", "number": 0,  
    "manual_input_chance": 0.0, "ask_user": False,  
})  
print(result["operation"])  # 例如 'multiply'  

# 测试: 生成一个数字(自动模式)  
result = service.invoke({  
    "action": "generate_number", "operation": "", "number": 0,  
    "manual_input_chance": 0.0, "ask_user": False,  
})  
print(result["number"], result["ask_user"])  # 例如 14, False  

# 测试: 生成一个数字(始终询问用户)  
result = service.invoke({  
    "action": "generate_number", "operation": "", "number": 0,  
    "manual_input_chance": 1.0, "ask_user": False,  
})  
 print(result["ask_user"])  # True — 编排器应提示用户

步骤 2:本地编排器图

接下来构建编排器,分顺序和并行两个版本以便对照。两者共享状态定义、远程图连接和节点函数,全部抽取到公共模块

shared.py

中。差异只在图的边如何连接。

目录结构:

 math_orchestrator/  
 ├── shared.py  
 ├── shared_resilient.py  
 ├── orchestrator_sequential.py  
 ├── orchestrator_parallel.py  
 └── orchestrator_parallel_resilient.py

math_orchestrator/shared.py 公共状态、连接和节点

 import os  
 from typing import TypedDict, Annotated  
 import operator  
 from langgraph.pregel.remote import RemoteGraph  
 # --- 状态定义 ---  

class OrchestratorState(TypedDict):  
    current_number: float  
    operation: str  
    random_number: int  
    history: Annotated[list[str], operator.add]  
     manual_input_chance: float  # 0.0 = 始终远程, 1.0 = 始终手动  
 # --- 连接远程图 ---  
# 单个远程图处理两种操作。  
# 认证 token 从环境变量加载。  

math_service = RemoteGraph(  
    "math_service",  
    url=os.environ.get("MATH_SERVICE_URL", "http://localhost:2024"),  
    headers={"Authorization": f"Bearer {os.environ.get('MATH_SERVICE_TOKEN', '')}"},  
 )  
 # --- 节点函数 ---  

def build_initial_state(current_number: float, manual_input_chance: float) -> dict:  
    """构建 graph.invoke() 的初始状态字典。"""  
    if manual_input_chance == 0.0:  
        mode = "automatic"  
    elif manual_input_chance == 1.0:  
        mode = "manual"  
    else:  
        mode = f"mixed ({int(manual_input_chance * 100)}% manual)"  

    return {  
        "current_number": current_number,  
        "operation": "",  
        "random_number": 0,  
        "history": [f"Starting number: {current_number} (mode: {mode})"],  
        "manual_input_chance": manual_input_chance,  
     }  
 def get_operation(state: OrchestratorState) -> dict:  
    """使用 action='pick_operation' 调用 math_service。"""  
    result = math_service.invoke({  
        "action": "pick_operation",  
        "operation": "",  
        "number": 0,  
        "manual_input_chance": 0.0,  
        "ask_user": False,  
    })  
    op = result["operation"]  
    print(f"  → Operation: {op}")  
     return {"operation": op}  
 def _prompt_user_number(state: OrchestratorState) -> int:  
    """通过 stdin 提示用户输入一个数字。"""  
    op = state.get("operation", "?")  
    current = state.get("current_number", 0)  
    while True:  
        raw = input(  
            f"  Current: {current} | Operation: {op} | Enter a number: "  
        )  
        try:  
            return int(raw)  
        except ValueError:  
             print("  Please enter a valid integer.")  
 def get_random_number(state: OrchestratorState) -> dict:  
    """  
    获取数学运算中要使用的下一个数字。  

    使用 action='generate_number' 调用 math_service,同时传递  
    manual_input_chance。远程图决定是生成一个数字还是请求  
    手动输入(通过 ask_user 标志)。  
    如果 ask_user 为 True,编排器在本地提示用户。  
    """  
    chance = state.get("manual_input_chance", 0.0)  

    result = math_service.invoke({  
        "action": "generate_number",  
        "operation": "",  
        "number": 0,  
        "manual_input_chance": chance,  
        "ask_user": False,  
    })  

    if result.get("ask_user"):  
        num = _prompt_user_number(state)  
        print(f"  → Number: {num} (manual)")  
        return {"random_number": num}  

    num = result["number"]  
    print(f"  → Number: {num}")  
     return {"random_number": num}  
 def execute_operation(state: OrchestratorState) -> dict:  
    """对 current_number 执行数学运算。"""  
    current = state["current_number"]  
    op = state["operation"]  
    num = state["random_number"]  

    if op == "add":  
        new_number = current + num  
        symbol = "+"  
    elif op == "subtract":  
        new_number = current - num  
        symbol = "-"  
    elif op == "multiply":  
        new_number = current * num  
        symbol = "×"  
    elif op == "divide":  
        if num == 0:  
            num = 1  
        new_number = round(current / num, 2)  
        symbol = "÷"  
    else:  
        new_number = current  
        symbol = "?"  

    entry = f"  {current} {symbol} {num} = {new_number}"  
    print(entry)  

    return {  
        "current_number": new_number,  
        "history": [entry],  
     }
get_operation

get_random_number

调用的是同一个

math_service

,只是传入不同的

action

值。编排器视角下,远程图是一个支持多种操作的单一端点。

下面看两种不同的图连接方式。每个编排器文件都很简短——业务逻辑全在

shared.py

里,编排器文件只关心拓扑结构。

顺序执行

math_orchestrator/orchestrator_sequential.py

顺序版本先调用

get_operation

,拿到

end

就直接终止,无需再去取随机数。非

end

的情况下继续调用

get_random_number

execute_operation

,然后循环回来。

 import argparse  
from langgraph.graph import StateGraph, START, END  
from shared import (  
    OrchestratorState,  
    build_initial_state,  
    get_operation,  
    get_random_number,  
    execute_operation,  
 )  
 # --- 路由逻辑 ---  
   
 def should_continue(state: OrchestratorState) -> str:  
     """获取操作后,决定:继续还是停止。"""  
     if state.get("operation") == "end":  
         return "finish"  
     return "continue"  
 # --- 构建图 ---  

builder = StateGraph(OrchestratorState)  

# 添加节点  
builder.add_node("get_operation", get_operation)  
builder.add_node("get_random_number", get_random_number)  
builder.add_node("execute_operation", execute_operation)  

# 定义边 — 顺序链  
builder.add_edge(START, "get_operation")  

# 获取操作后,决定:继续还是结束?  
builder.add_conditional_edges(  
    "get_operation",  
    should_continue,  
    {  
        "continue": "get_random_number",  
        "finish": END,  
    },  
)  

builder.add_edge("get_random_number", "execute_operation")  

# 执行后,循环回到 get_operation  
builder.add_edge("execute_operation", "get_operation")  

# 编译  
 graph = builder.compile()  
 # --- 运行 ---  

if __name__ == "__main__":  
    parser = argparse.ArgumentParser()  
    parser.add_argument(  
        "--start-number", type=float, default=None,  
        help="Initial number to start with (prompts if not provided)",  
    )  
    parser.add_argument(  
        "--manual-input", action="store_true",  
        help="Always prompt user for numbers (shorthand for --manual-input-chance 1.0)",  
    )  
    parser.add_argument(  
        "--manual-input-chance", type=float, default=0.0,  
        help="Probability (0.0-1.0) of prompting user for each number (default: 0.0)",  
    )  
    args = parser.parse_args()  

    start = args.start_number  
    if start is None:  
        start = float(input("Enter starting number: "))  

    chance = 1.0 if args.manual_input else args.manual_input_chance  

    result = graph.invoke(build_initial_state(  
        current_number=start,  
        manual_input_chance=chance,  
    ))  

    print("\n🧮 Math Engine Complete! (Sequential)\n")  
    print("Computation History:")  
    for entry in result["history"]:  
        print(entry)  
     print(f"\nFinal Result: {result['current_number']}")

顺序流的好处是逻辑直白,而且最后一次迭代拿到

end

时可以直接跳过取随机数的调用,省掉一次无用的 HTTP 请求。代价是每轮迭代的两次远程调用必须串行,一个等另一个。

并行执行

math_orchestrator/orchestrator_parallel.py

并行版本利用 LangGraph 的 fan-out/fan-in 模式。

get_operation

get_random_number

在同一个 superstep 中同时执行,两者都完成后

execute_operation

再决定是继续 fan-out 还是终止。

 import argparse  
from langgraph.graph import StateGraph, START, END  
from shared import (  
    OrchestratorState,  
    build_initial_state,  
    get_operation,  
    get_random_number,  
    execute_operation,  
 )  
 # --- 路由逻辑 ---  

def should_continue(state: OrchestratorState) -> list[str] | str:  
    """  
    决定是继续循环还是停止。  
    返回节点名称列表用于 fan-out(并行),  
    或返回 END 以终止。  
    """  
    if state.get("operation") == "end":  
        return END  
    # Fan-out: 并行路由到两个节点  
     return ["get_operation", "get_random_number"]  
 # --- 构建图 ---  

builder = StateGraph(OrchestratorState)  

# 添加节点  
builder.add_node("get_operation", get_operation)  
builder.add_node("get_random_number", get_random_number)  
builder.add_node("execute_operation", execute_operation)  

# 定义边  
# Fan-out: START 并行发送到两个节点  
builder.add_edge(START, "get_operation")  
builder.add_edge(START, "get_random_number")  

# Fan-in: 两个节点都必须完成后 execute_operation 才能运行  
builder.add_edge("get_operation", "execute_operation")  
builder.add_edge("get_random_number", "execute_operation")  

# 执行后,决定:再次 fan-out,还是结束  
builder.add_conditional_edges(  
    "execute_operation",  
    should_continue,  
    ["get_operation", "get_random_number", END],  
)  

# 编译  
 graph = builder.compile()  
 # --- 运行 ---  

if __name__ == "__main__":  
    parser = argparse.ArgumentParser()  
    parser.add_argument(  
        "--start-number", type=float, default=None,  
        help="Initial number to start with (prompts if not provided)",  
    )  
    parser.add_argument(  
        "--manual-input", action="store_true",  
        help="Always prompt user for numbers (shorthand for --manual-input-chance 1.0)",  
    )  
    parser.add_argument(  
        "--manual-input-chance", type=float, default=0.0,  
        help="Probability (0.0-1.0) of prompting user for each number (default: 0.0)",  
    )  
    args = parser.parse_args()  

    start = args.start_number  
    if start is None:  
        start = float(input("Enter starting number: "))  

    chance = 1.0 if args.manual_input else args.manual_input_chance  

    result = graph.invoke(build_initial_state(  
        current_number=start,  
        manual_input_chance=chance,  
    ))  

    print("\n🧮 Math Engine Complete! (Parallel)\n")  
    print("Computation History:")  
    for entry in result["history"]:  
        print(entry)  
     print(f"\nFinal Result: {result['current_number']}")

运行结果

打开两个终端窗口:

终端 1——启动远程 math_service(如果此前没启动的话):

 cd math_service  
 langgraph dev --port 2024 --no-browser

终端 2——运行编排器(任选其一):

 cd math_orchestrator  

# 选项 A: 顺序 — 提示输入起始数字  
python orchestrator_sequential.py  

# 选项 B: 并行 — 通过参数指定起始数字  
python orchestrator_parallel.py --start-number 100  

# 任何选项配合手动输入模式 — 每次提示你输入数字:  
python orchestrator_sequential.py --start-number 100 --manual-input  

# 混合模式 — 每次迭代有 50% 的概率提示你:  
 python orchestrator_sequential.py --start-number 100 --manual-input-chance 0.5

一次典型运行的输出如下:

   → Operation: subtract  
  → Number: 10  
  100.0 - 10 = 90.0  
  → Operation: subtract  
  Current: 90.0 | Operation: subtract | Enter a number: 1  
  → Number: 1 (manual)  
  90.0 - 1 = 89.0  
  → Operation: add  
  Current: 89.0 | Operation: add | Enter a number: 1  
  → Number: 1 (manual)  
  89.0 + 1 = 90.0  
  → Operation: divide  
  → Number: 17  
  90.0 ÷ 17 = 5.29  
  → Operation: add  
  → Number: 5  
  5.29 + 5 = 10.29  
  → Operation: divide  
  Current: 10.29 | Operation: divide | Enter a number: 1  
  → Number: 1 (manual)  
  10.29 ÷ 1 = 10.29  
  → Operation: subtract  
  Current: 10.29 | Operation: subtract | Enter a number: 1  
  → Number: 1 (manual)  
  10.29 - 1 = 9.29  
  → Operation: multiply  
  Current: 9.29 | Operation: multiply | Enter a number: 1  
  → Number: 1 (manual)  
  9.29 × 1 = 9.29  
  → Operation: multiply  
  Current: 9.29 | Operation: multiply | Enter a number: 2  
  → Number: 2 (manual)  
  9.29 × 2 = 18.58  
  → Operation: multiply  
  → Number: 10  
  18.58 × 10 = 185.79999999999998  
  → Operation: end  

🧮 Math Engine Complete! (Sequential)  

Computation History:  
Starting number: 100.0 (mode: mixed (50% manual))  
  100.0 - 10 = 90.0  
  90.0 - 1 = 89.0  
  89.0 + 1 = 90.0  
  90.0 ÷ 17 = 5.29  
  5.29 + 5 = 10.29  
  10.29 ÷ 1 = 10.29  
  10.29 - 1 = 9.29  
  9.29 × 1 = 9.29  
  9.29 × 2 = 18.58  
  18.58 × 10 = 185.79999999999998  

 Final Result: 185.79999999999998

运算和数字都由远程服务随机生成,每次运行的结果不同。

从控制台到生产环境:使用 interrupt

input()

适合本地脚本调试。到了生产环境——编排器可能藏在 REST API、Web UI 或聊天界面后面——没有控制台可用。LangGraph 对此有一个一等原语:

interrupt

机制不复杂:节点调用

interrupt()

时 LangGraph 暂停整个图,将完整状态写入 checkpoint,然后把控制权交还给调用方。调用方(API 服务、Web 应用等)拿到暂停信号后向用户展示提示,收到响应后用

Command(resume=...)

恢复执行。图从

interrupt()

调用处精确恢复,哪怕过了几个小时、换了一台机器也没问题。

以下是用

interrupt

替换

input()

get_random_number

的写法:

math_orchestrator/shared_interrupt.py(相关摘录)

 import sqlite3  
 from langgraph.types import interrupt, Command  
 from langgraph.checkpoint.sqlite import SqliteSaver  
 from langgraph.pregel.remote import RemoteGraph  
 math_service = RemoteGraph(  
     "math_service",  
     url="http://localhost:2024",  
 )  
 def get_random_number(state):  
    """  
    获取下一个数字 — 通过远程图或人工中断。  

    当远程图返回 ask_user=True 时,我们不调用 input(),  
    而是调用 interrupt(),它会暂停整个图并向调用应用程序  
    呈现一个提示。  
    """  
    chance = state.get("manual_input_chance", 0.0)  

    result = math_service.invoke({  
        "action": "generate_number",  
        "operation": "",  
        "number": 0,  
        "manual_input_chance": chance,  
        "ask_user": False,  
    })  

    if result.get("ask_user"):  
        # 暂停图 — 调用者接收此提示  
        num = interrupt({  
            "prompt": "Enter a number",  
            "current_number": state.get("current_number"),  
            "operation": state.get("operation"),  
        })  
        print(f"  → Number: {num} (manual)")  
        return {"random_number": int(num)}  

    num = result["number"]  
    print(f"  → Number: {num}")  
     return {"random_number": num}

编译图时必须附带 checkpointer(状态持久化),调用时必须指定 thread_id(标识具体会话):

 # 带 checkpointer 编译  
checkpointer = SqliteSaver(sqlite3.connect("math_engine.db"))  
graph = builder.compile(checkpointer=checkpointer)  

# 启动新线程  
config = {"configurable": {"thread_id": "session-42"}}  

# 首次调用 — 运行直到 interrupt() 被调用  
result = graph.invoke(  
    build_initial_state(current_number=100, manual_input_chance=1.0),  
    config=config,  
)  

# 图现在已暂停。result["__interrupt__"] 包含提示:  
# [Interrupt(value={"prompt": "Enter a number", "current_number": 100, ...})]  

# ... 时间流逝,用户通过 Web UI、API 等提供输入 ...  

# 使用用户的值恢复  
result = graph.invoke(Command(resume=42), config=config)  

# 图从 interrupt() 被调用的地方继续执行,  
 # num = 42,并运行直到下一个 interrupt 或 END。

几个要点。

interrupt()

input()

目的相同,区别在于前者走 HTTP 而非 stdin——传入一个 payload(提示、上下文等),调用方通过

Command(resume=...)

回传用户输入。checkpointer 负责持久化图状态,LangGraph 支持 SQLite、Postgres 等多种后端。thread_id 用来标识会话,多个用户可以各自持有独立的暂停/运行中的图实例。远程图和图的连接方式无需任何改动,变化只发生在节点函数和调用模式上。

保护线程:认证与授权

如果 thread ID 是保护暂停会话的唯一手段,任何猜中或截获了 thread ID 的人都能恢复别人的图、注入自己的值。LangGraph Platform 对此有内置的认证与授权层。

认证系统分两步走。

@auth.authenticate

处理程序作为中间件在每个请求上运行,验证调用者的凭据(JWT token、API 密钥、OAuth2 等)并返回用户身份;

@auth.on

处理程序执行资源级访问控制,给每个线程打上所有者标记,过滤访问权限,确保用户只能看到和恢复自己的线程。

线程级安全的实现如下:

 from langgraph_sdk import Auth  

auth = Auth()  

@auth.authenticate  
async def authenticate(authorization: str) -> Auth.types.MinimalUserDict:  
    """验证 Bearer token 并返回用户信息。"""  
    token = authorization.split(" ", 1)[1]  
    user = await verify_jwt(token)  # 你的 JWT 验证逻辑  
    return {  
        "identity": user["sub"],  
        "is_authenticated": True,  
    }  

@auth.on.threads.create  
async def on_thread_create(  
    ctx: Auth.types.AuthContext,  
    value: Auth.types.on.threads.create.value,  
):  
    """为每个新线程标记创建者的身份。  

    `value` 是线程创建载荷 — 一个包含来自 API 请求的字段的字典:  
    thread_id, metadata, if_exists 等。  
    我们修改 value["metadata"] 以在存储之前标记所有者。  
    返回值是 LangGraph 写入线程 metadata 的元数据过滤器。  
    """  
    value.setdefault("metadata", {})["owner"] = ctx.user.identity  
    return {"owner": ctx.user.identity}  

@auth.on.threads.read  
async def on_thread_read(  
    ctx: Auth.types.AuthContext,  
    value: Auth.types.on.threads.read.value,  
):  
    """过滤线程,使用户只能看到自己的。  

    `value` 是读取请求载荷 — 一个包含 thread_id 和来自  
    API 请求的任何 metadata 的字典。  

    返回值不是检查 — 而是查询过滤器。  
    LangGraph 在存储层应用它:只有 metadata.owner 与  
    ctx.user.identity 匹配的线程才会被返回。  
    其他用户拥有的线程是不可见的,而不仅仅是被阻止。  
    """  
    return {"owner": ctx.user.identity}  

@auth.on.threads.create_run  
async def on_thread_resume(  
    ctx: Auth.types.AuthContext,  
    value: Auth.types.on.threads.create_run.value,  
):  
    """过滤用户可以在哪些线程上恢复运行。  

    `value` 是运行创建载荷 — 一个包含 thread_id, assistant_id,  
    input, command, metadata, config 等的字典。  

    相同的过滤器机制:LangGraph 仅在线程的 metadata.owner  
    与返回的过滤器匹配时才允许运行。如果用户尝试恢复  
    另一个用户的线程,平台会拒绝请求,因为该线程  
    未通过过滤器。  
    """  
    metadata = value.setdefault("metadata", {})  
    metadata["owner"] = ctx.user.identity  
     return {"owner": ctx.user.identity}

langgraph.json

中注册:

 {  
   "auth": {  
     "path": "src/security/auth.py:auth"  
   }  
 }

配置完成后,即使攻击者拿到了其他用户的 thread ID 也无法读取线程状态或恢复运行——授权处理程序会因为 owner metadata 不匹配而拒绝请求。过滤发生在平台层,不在图代码中,无法通过构造 API 请求绕过。

生产部署时 LangGraph Platform 可以对接 Auth0、Supabase 以及任何 OAuth2/JWT 认证体系。记住一点:thread ID 是标识符,不是密钥——安全保障来自认证层对访问权限的把控。

前面基于控制台

input()

的版本已经是这一模式的可运行原型。迁移到

interrupt

只需改动节点函数和调用模式,架构其余部分——远程图、fan-out/fan-in、错误处理——全部保持原样。

RemoteGraph 的工作原理

langgraph.pregel.remote

中的

RemoteGraph

类是整个组合能力的底层支撑。它实现的接口和本地编译的图完全一致,可以

.invoke()

.stream()

,也可以直接嵌入为另一个图的子图节点。通信通过 HTTP对接 LangGraph Server API。

 from langgraph.pregel.remote import RemoteGraph  

remote = RemoteGraph(  
    "math_service",          # 来自 langgraph.json 的 assistant ID  
    url="http://localhost:2024",  
)  

# 像使用普通图一样使用它 — action 字段控制行为  
 result = remote.invoke({"action": "pick_operation", "operation": "", "number": 0})
RemoteGraph

遵循

Runnable

接口,可以直接作为节点添加到另一个图中:

 builder.add_node("my_remote_node", remote_graph)

编排器图不需要了解远程图的内部实现细节——它可以是一个简单状态机、一个 LLM 驱动的 agent,或者任何介于两者之间的东西。而通过条件路由,一个远程图部署就能承载多种操作。

第二个调用依赖于第一个调用的结果时选顺序执行,或者想在最后一轮

end

时省掉无用的远程调用也该选顺序。两个调用互相独立、想缩短总耗时就选并行——在生产环境中远程图调用可能涉及 LLM 推理或数据库查询,并行执行差不多能把每轮延迟砍掉一半。

错误处理

并行执行带来一个自然的问题:远程图调用失败了怎么办?

math_service

临时挂掉\网络请求超时,这些情况都需要考虑。LangGraph 的处理提供了多种应对策略。

默认行为:原子 superstep

LangGraph 中并行节点在一个 superstep 里共同执行。superstep 中任何一个节点抛出异常,整个 superstep 原子性失败,不会有部分状态写入。假如

get_random_number

成功、

get_operation

失败,两边的结果都不会写入状态,避免了有随机数却没运算符这种不一致。

配了 checkpointer 的情况下 LangGraph 会在内部保存成功节点的结果。恢复执行时只有失败分支重试,成功分支的工作不必重复。

策略 1:RetryPolicy(图原生重试)

最干净的做法是对容易出错的节点附加

RetryPolicy

。LangGraph 接管重试循环,支持配置尝试次数、退避策略和抖动。只有失败分支重试,成功的并行节点不会重新执行。

重试全部耗尽后异常向上传播,图调用失败。对网络超时、5xx 错误这类瞬态故障,这是恰当的处理方式。

策略 2:节点内 Try/Except(降级处理)

需要图在远程服务不可用时仍然继续运行的场景下,在节点内部捕获异常并返回降级值即可。操作调用失败则引擎停止循环;数字生成调用失败则用安全默认值代替。

策略 3:两者结合(生产环境推荐)

生产中通常既要重试也要降级。

RetryPolicy

透明处理瞬态故障,重试全部耗尽后异常才落到节点内部的

try/except

块,由它提供兜底逻辑。

以下是三种策略的完整可运行示例。

总结

RemoteGraph 让分布式 agent 架构的组合变得相当简洁——顺序还是并行的连接方式随意切换,

RetryPolicy

加上节点内降级逻辑构成两层容错。单个远程图通过条件路由就能承载多种操作,基础设施不必铺得很大,编排逻辑也能保持清晰。

这个数学引擎作为 demo 虽然简单,展示的模式却可以直接迁移到正式系统——微服务化的 AI 编排,每个图作为一个独立部署的 agent 逻辑单元,根据延迟和成本需求选择合适的执行策略。

本文完整代码:

https://github.com/sentipy/langgraph_random_math

by Alexander Machekhin

“LangGraph RemoteGraph:本地图与远程图的组合机制解析”的评论:

还没有评论