把 AI agent 的逻辑拆分到多个独立运行的服务中,听起来复杂做起来也确实容易乱。LangGraph 的 RemoteGraph 特性算是一个干净的方案:本地编排器负责流程控制,远程图服务器承担具体计算,状态管理和控制流的职责边界清晰。
本文要构建的项目是一个循环数学引擎:本地图编排一个远程图:随机选择数学运算和生成随机数。编排器会以两种方式实现——顺序执行和并行执行——以便对比两者的取舍,方便根据场景选择合适的模式。循环持续运行,直到远程图返回
end
。
架构概览
一个 math_service 远程图负责两种操作,本地 math_orchestrator 在每次迭代中调用它两次,每种操作各一次。下面分别是顺序版本和并行版本的编排器结构:
顺序流——远程图被依次调用两次:
并行流——远程图用 fan-out/fan-in 模式同时被调用两次:
math_service 是远程图,接受
action
字段:
"pick_operation"
返回一个随机数学运算或
end
,
"generate_number"
返回一个随机整数。
math_orchestrator 是本地图,接受初始数字后每次迭代调用远程图两次(分别传入不同 action),执行数学运算,operation 为
end
时终止。
环境准备
uv/pyproject.toml 配置如下:
[project]
name = "langgraph-random-math"
version = "0.1.0"
description = "Add your description here"
requires-python = "==3.13"
dependencies = [
"langgraph",
"langgraph-cli",
"langgraph-sdk"
]
截至本文编写时 pydantic 与 Python 3.14 及更高版本不兼容,所以这里用 3.13。
两个图都在本地运行——远程图跑在 LangGraph 开发服务器上,本地图作为普通 Python 脚本执行。
步骤 1:math_service——远程图
远程图用条件路由在单个图中处理两种操作。传入状态的
action
字段决定路由方向:
pick_operation
或
generate_number
。
创建远程服务目录结构:
math_service/
├── auth.py
├── graph.py
├── langgraph.json
└── .env
math_service/graph.py
import random
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
class MathServiceState(TypedDict):
action: str # "pick_operation" or "generate_number"
operation: str # 结果: "add", "subtract", "multiply", "divide", 或 "end"
number: int # 结果: 随机整数
manual_input_chance: float # 请求用户输入的概率 (0.0-1.0)
ask_user: bool # 结果: True = 编排器应提示用户
def route_action(state: MathServiceState) -> str:
"""根据 action 字段路由到相应的节点。"""
if state["action"] == "pick_operation":
return "pick_operation"
elif state["action"] == "generate_number":
return "generate_number"
else:
raise ValueError(f"Unknown action: {state['action']}")
def pick_operation(state: MathServiceState) -> dict:
"""随机选择一个数学运算或 'end' 来停止循环。"""
operations = ["add", "subtract", "multiply", "divide", "end"]
# 'end' 有 10% 的概率;剩余 90% 在数学运算之间平均分配
weights = [9, 9, 9, 9, 4]
chosen = random.choices(operations, weights=weights, k=1)[0]
return {"operation": chosen}
def generate_number(state: MathServiceState) -> dict:
"""
生成随机数或请求手动用户输入。
根据 manual_input_chance 进行掷骰:如果结果低于阈值,
返回 ask_user=True(不生成数字——编排器应提示用户)。
否则,生成并返回一个随机整数。
"""
chance = state.get("manual_input_chance", 0.0)
if chance > 0.0 and random.random() < chance:
return {"ask_user": True}
return {"number": random.randint(1, 20), "ask_user": False}
builder = StateGraph(MathServiceState)
builder.add_node("pick_operation", pick_operation)
builder.add_node("generate_number", generate_number)
# 基于 action 字段从 START 进行条件路由
builder.add_conditional_edges(
START,
route_action,
{
"pick_operation": "pick_operation",
"generate_number": "generate_number",
},
)
builder.add_edge("pick_operation", END)
builder.add_edge("generate_number", END)
graph = builder.compile()
math_service/auth.py
import os
from langgraph_sdk import Auth
auth = Auth()
# 在生产环境中,请使用正规的 JWT 验证库。
# 此示例使用简单的 token 查找以保持清晰。
VALID_TOKENS = {
os.environ.get("MATH_SERVICE_TOKEN", "dev-token"): {
"id": "orchestrator",
"name": "Math Orchestrator",
},
}
@auth.authenticate
async def authenticate(authorization: str | None) -> Auth.types.MinimalUserDict:
"""验证 Authorization 头中的 Bearer token。"""
if not authorization:
raise Auth.exceptions.HTTPException(
status_code=401, detail="Missing authorization header"
)
try:
scheme, token = authorization.split(" ", 1)
except ValueError:
raise Auth.exceptions.HTTPException(
status_code=401, detail="Invalid authorization format"
)
if scheme.lower() != "bearer" or token not in VALID_TOKENS:
raise Auth.exceptions.HTTPException(
status_code=401, detail="Invalid token"
)
user = VALID_TOKENS[token]
return {
"identity": user["id"],
"is_authenticated": True,
}
@auth.on
async def authorize_all(ctx: Auth.types.AuthContext, value: dict):
"""允许已认证用户执行所有操作。
在更复杂的设置中,你可以通过检查 value 载荷来限制
每个调用者可以调用哪些操作(pick_operation vs generate_number)。
"""
return None # None = 允许,不添加额外过滤
这是一个最小认证层:检查 Bearer token 有效性,对已认证的调用者放行所有操作。生产环境中应当用 JWT 验证(
PyJWT
、Auth0 等)替代 token 查表,并按需增加操作级别的授权。
generate_number
节点内部完成决策——根据
manual_input_chance
掷骰后,要么生成数字,要么置
ask_user=True
。编排器检查这个标志并在需要时于本地提示用户。决策逻辑留在服务端,用户交互留在客户端,正是微服务中典型的职责划分方式。
math_service/langgraph.json:
{
"dependencies": ["."],
"graphs": {
"math_service": "./graph.py:graph"
},
"auth": {
"path": "./auth.py:auth"
},
"env": ".env"
}
创建
.env
文件写入服务 token:
MATH_SERVICE_TOKEN=dev-token
在端口 2024 启动服务器:
cd math_service
langgraph dev --port 2024 --no-browser
可以针对运行中的服务器测试两种操作:
from langgraph.pregel.remote import RemoteGraph
# 不带 token — 应该返回 401 失败
try:
bad_service = RemoteGraph("math_service", url="http://localhost:2024")
bad_service.invoke({
"action": "pick_operation", "operation": "", "number": 0,
"manual_input_chance": 0.0, "ask_user": False,
})
print("❌ Should have failed without token!")
except Exception as e:
print(f"✅ Correctly blocked: {e}")
# 带有效 token — 应该成功
service = RemoteGraph(
"math_service",
url="http://localhost:2024",
headers={"Authorization": "Bearer dev-token"},
)
# 测试: 选择一个运算
result = service.invoke({
"action": "pick_operation", "operation": "", "number": 0,
"manual_input_chance": 0.0, "ask_user": False,
})
print(result["operation"]) # 例如 'multiply'
# 测试: 生成一个数字(自动模式)
result = service.invoke({
"action": "generate_number", "operation": "", "number": 0,
"manual_input_chance": 0.0, "ask_user": False,
})
print(result["number"], result["ask_user"]) # 例如 14, False
# 测试: 生成一个数字(始终询问用户)
result = service.invoke({
"action": "generate_number", "operation": "", "number": 0,
"manual_input_chance": 1.0, "ask_user": False,
})
print(result["ask_user"]) # True — 编排器应提示用户
步骤 2:本地编排器图
接下来构建编排器,分顺序和并行两个版本以便对照。两者共享状态定义、远程图连接和节点函数,全部抽取到公共模块
shared.py
中。差异只在图的边如何连接。
目录结构:
math_orchestrator/
├── shared.py
├── shared_resilient.py
├── orchestrator_sequential.py
├── orchestrator_parallel.py
└── orchestrator_parallel_resilient.py
math_orchestrator/shared.py 公共状态、连接和节点
import os
from typing import TypedDict, Annotated
import operator
from langgraph.pregel.remote import RemoteGraph
# --- 状态定义 ---
class OrchestratorState(TypedDict):
current_number: float
operation: str
random_number: int
history: Annotated[list[str], operator.add]
manual_input_chance: float # 0.0 = 始终远程, 1.0 = 始终手动
# --- 连接远程图 ---
# 单个远程图处理两种操作。
# 认证 token 从环境变量加载。
math_service = RemoteGraph(
"math_service",
url=os.environ.get("MATH_SERVICE_URL", "http://localhost:2024"),
headers={"Authorization": f"Bearer {os.environ.get('MATH_SERVICE_TOKEN', '')}"},
)
# --- 节点函数 ---
def build_initial_state(current_number: float, manual_input_chance: float) -> dict:
"""构建 graph.invoke() 的初始状态字典。"""
if manual_input_chance == 0.0:
mode = "automatic"
elif manual_input_chance == 1.0:
mode = "manual"
else:
mode = f"mixed ({int(manual_input_chance * 100)}% manual)"
return {
"current_number": current_number,
"operation": "",
"random_number": 0,
"history": [f"Starting number: {current_number} (mode: {mode})"],
"manual_input_chance": manual_input_chance,
}
def get_operation(state: OrchestratorState) -> dict:
"""使用 action='pick_operation' 调用 math_service。"""
result = math_service.invoke({
"action": "pick_operation",
"operation": "",
"number": 0,
"manual_input_chance": 0.0,
"ask_user": False,
})
op = result["operation"]
print(f" → Operation: {op}")
return {"operation": op}
def _prompt_user_number(state: OrchestratorState) -> int:
"""通过 stdin 提示用户输入一个数字。"""
op = state.get("operation", "?")
current = state.get("current_number", 0)
while True:
raw = input(
f" Current: {current} | Operation: {op} | Enter a number: "
)
try:
return int(raw)
except ValueError:
print(" Please enter a valid integer.")
def get_random_number(state: OrchestratorState) -> dict:
"""
获取数学运算中要使用的下一个数字。
使用 action='generate_number' 调用 math_service,同时传递
manual_input_chance。远程图决定是生成一个数字还是请求
手动输入(通过 ask_user 标志)。
如果 ask_user 为 True,编排器在本地提示用户。
"""
chance = state.get("manual_input_chance", 0.0)
result = math_service.invoke({
"action": "generate_number",
"operation": "",
"number": 0,
"manual_input_chance": chance,
"ask_user": False,
})
if result.get("ask_user"):
num = _prompt_user_number(state)
print(f" → Number: {num} (manual)")
return {"random_number": num}
num = result["number"]
print(f" → Number: {num}")
return {"random_number": num}
def execute_operation(state: OrchestratorState) -> dict:
"""对 current_number 执行数学运算。"""
current = state["current_number"]
op = state["operation"]
num = state["random_number"]
if op == "add":
new_number = current + num
symbol = "+"
elif op == "subtract":
new_number = current - num
symbol = "-"
elif op == "multiply":
new_number = current * num
symbol = "×"
elif op == "divide":
if num == 0:
num = 1
new_number = round(current / num, 2)
symbol = "÷"
else:
new_number = current
symbol = "?"
entry = f" {current} {symbol} {num} = {new_number}"
print(entry)
return {
"current_number": new_number,
"history": [entry],
}
get_operation
和
get_random_number
调用的是同一个
math_service
,只是传入不同的
action
值。编排器视角下,远程图是一个支持多种操作的单一端点。
下面看两种不同的图连接方式。每个编排器文件都很简短——业务逻辑全在
shared.py
里,编排器文件只关心拓扑结构。
顺序执行
math_orchestrator/orchestrator_sequential.py
顺序版本先调用
get_operation
,拿到
end
就直接终止,无需再去取随机数。非
end
的情况下继续调用
get_random_number
和
execute_operation
,然后循环回来。
import argparse
from langgraph.graph import StateGraph, START, END
from shared import (
OrchestratorState,
build_initial_state,
get_operation,
get_random_number,
execute_operation,
)
# --- 路由逻辑 ---
def should_continue(state: OrchestratorState) -> str:
"""获取操作后,决定:继续还是停止。"""
if state.get("operation") == "end":
return "finish"
return "continue"
# --- 构建图 ---
builder = StateGraph(OrchestratorState)
# 添加节点
builder.add_node("get_operation", get_operation)
builder.add_node("get_random_number", get_random_number)
builder.add_node("execute_operation", execute_operation)
# 定义边 — 顺序链
builder.add_edge(START, "get_operation")
# 获取操作后,决定:继续还是结束?
builder.add_conditional_edges(
"get_operation",
should_continue,
{
"continue": "get_random_number",
"finish": END,
},
)
builder.add_edge("get_random_number", "execute_operation")
# 执行后,循环回到 get_operation
builder.add_edge("execute_operation", "get_operation")
# 编译
graph = builder.compile()
# --- 运行 ---
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--start-number", type=float, default=None,
help="Initial number to start with (prompts if not provided)",
)
parser.add_argument(
"--manual-input", action="store_true",
help="Always prompt user for numbers (shorthand for --manual-input-chance 1.0)",
)
parser.add_argument(
"--manual-input-chance", type=float, default=0.0,
help="Probability (0.0-1.0) of prompting user for each number (default: 0.0)",
)
args = parser.parse_args()
start = args.start_number
if start is None:
start = float(input("Enter starting number: "))
chance = 1.0 if args.manual_input else args.manual_input_chance
result = graph.invoke(build_initial_state(
current_number=start,
manual_input_chance=chance,
))
print("\n🧮 Math Engine Complete! (Sequential)\n")
print("Computation History:")
for entry in result["history"]:
print(entry)
print(f"\nFinal Result: {result['current_number']}")
顺序流的好处是逻辑直白,而且最后一次迭代拿到
end
时可以直接跳过取随机数的调用,省掉一次无用的 HTTP 请求。代价是每轮迭代的两次远程调用必须串行,一个等另一个。
并行执行
math_orchestrator/orchestrator_parallel.py
并行版本利用 LangGraph 的 fan-out/fan-in 模式。
get_operation
和
get_random_number
在同一个 superstep 中同时执行,两者都完成后
execute_operation
再决定是继续 fan-out 还是终止。
import argparse
from langgraph.graph import StateGraph, START, END
from shared import (
OrchestratorState,
build_initial_state,
get_operation,
get_random_number,
execute_operation,
)
# --- 路由逻辑 ---
def should_continue(state: OrchestratorState) -> list[str] | str:
"""
决定是继续循环还是停止。
返回节点名称列表用于 fan-out(并行),
或返回 END 以终止。
"""
if state.get("operation") == "end":
return END
# Fan-out: 并行路由到两个节点
return ["get_operation", "get_random_number"]
# --- 构建图 ---
builder = StateGraph(OrchestratorState)
# 添加节点
builder.add_node("get_operation", get_operation)
builder.add_node("get_random_number", get_random_number)
builder.add_node("execute_operation", execute_operation)
# 定义边
# Fan-out: START 并行发送到两个节点
builder.add_edge(START, "get_operation")
builder.add_edge(START, "get_random_number")
# Fan-in: 两个节点都必须完成后 execute_operation 才能运行
builder.add_edge("get_operation", "execute_operation")
builder.add_edge("get_random_number", "execute_operation")
# 执行后,决定:再次 fan-out,还是结束
builder.add_conditional_edges(
"execute_operation",
should_continue,
["get_operation", "get_random_number", END],
)
# 编译
graph = builder.compile()
# --- 运行 ---
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--start-number", type=float, default=None,
help="Initial number to start with (prompts if not provided)",
)
parser.add_argument(
"--manual-input", action="store_true",
help="Always prompt user for numbers (shorthand for --manual-input-chance 1.0)",
)
parser.add_argument(
"--manual-input-chance", type=float, default=0.0,
help="Probability (0.0-1.0) of prompting user for each number (default: 0.0)",
)
args = parser.parse_args()
start = args.start_number
if start is None:
start = float(input("Enter starting number: "))
chance = 1.0 if args.manual_input else args.manual_input_chance
result = graph.invoke(build_initial_state(
current_number=start,
manual_input_chance=chance,
))
print("\n🧮 Math Engine Complete! (Parallel)\n")
print("Computation History:")
for entry in result["history"]:
print(entry)
print(f"\nFinal Result: {result['current_number']}")
运行结果
打开两个终端窗口:
终端 1——启动远程 math_service(如果此前没启动的话):
cd math_service
langgraph dev --port 2024 --no-browser
终端 2——运行编排器(任选其一):
cd math_orchestrator
# 选项 A: 顺序 — 提示输入起始数字
python orchestrator_sequential.py
# 选项 B: 并行 — 通过参数指定起始数字
python orchestrator_parallel.py --start-number 100
# 任何选项配合手动输入模式 — 每次提示你输入数字:
python orchestrator_sequential.py --start-number 100 --manual-input
# 混合模式 — 每次迭代有 50% 的概率提示你:
python orchestrator_sequential.py --start-number 100 --manual-input-chance 0.5
一次典型运行的输出如下:
→ Operation: subtract
→ Number: 10
100.0 - 10 = 90.0
→ Operation: subtract
Current: 90.0 | Operation: subtract | Enter a number: 1
→ Number: 1 (manual)
90.0 - 1 = 89.0
→ Operation: add
Current: 89.0 | Operation: add | Enter a number: 1
→ Number: 1 (manual)
89.0 + 1 = 90.0
→ Operation: divide
→ Number: 17
90.0 ÷ 17 = 5.29
→ Operation: add
→ Number: 5
5.29 + 5 = 10.29
→ Operation: divide
Current: 10.29 | Operation: divide | Enter a number: 1
→ Number: 1 (manual)
10.29 ÷ 1 = 10.29
→ Operation: subtract
Current: 10.29 | Operation: subtract | Enter a number: 1
→ Number: 1 (manual)
10.29 - 1 = 9.29
→ Operation: multiply
Current: 9.29 | Operation: multiply | Enter a number: 1
→ Number: 1 (manual)
9.29 × 1 = 9.29
→ Operation: multiply
Current: 9.29 | Operation: multiply | Enter a number: 2
→ Number: 2 (manual)
9.29 × 2 = 18.58
→ Operation: multiply
→ Number: 10
18.58 × 10 = 185.79999999999998
→ Operation: end
🧮 Math Engine Complete! (Sequential)
Computation History:
Starting number: 100.0 (mode: mixed (50% manual))
100.0 - 10 = 90.0
90.0 - 1 = 89.0
89.0 + 1 = 90.0
90.0 ÷ 17 = 5.29
5.29 + 5 = 10.29
10.29 ÷ 1 = 10.29
10.29 - 1 = 9.29
9.29 × 1 = 9.29
9.29 × 2 = 18.58
18.58 × 10 = 185.79999999999998
Final Result: 185.79999999999998
运算和数字都由远程服务随机生成,每次运行的结果不同。
从控制台到生产环境:使用 interrupt
input()
适合本地脚本调试。到了生产环境——编排器可能藏在 REST API、Web UI 或聊天界面后面——没有控制台可用。LangGraph 对此有一个一等原语:
interrupt
。
机制不复杂:节点调用
interrupt()
时 LangGraph 暂停整个图,将完整状态写入 checkpoint,然后把控制权交还给调用方。调用方(API 服务、Web 应用等)拿到暂停信号后向用户展示提示,收到响应后用
Command(resume=...)
恢复执行。图从
interrupt()
调用处精确恢复,哪怕过了几个小时、换了一台机器也没问题。
以下是用
interrupt
替换
input()
后
get_random_number
的写法:
math_orchestrator/shared_interrupt.py(相关摘录)
import sqlite3
from langgraph.types import interrupt, Command
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.pregel.remote import RemoteGraph
math_service = RemoteGraph(
"math_service",
url="http://localhost:2024",
)
def get_random_number(state):
"""
获取下一个数字 — 通过远程图或人工中断。
当远程图返回 ask_user=True 时,我们不调用 input(),
而是调用 interrupt(),它会暂停整个图并向调用应用程序
呈现一个提示。
"""
chance = state.get("manual_input_chance", 0.0)
result = math_service.invoke({
"action": "generate_number",
"operation": "",
"number": 0,
"manual_input_chance": chance,
"ask_user": False,
})
if result.get("ask_user"):
# 暂停图 — 调用者接收此提示
num = interrupt({
"prompt": "Enter a number",
"current_number": state.get("current_number"),
"operation": state.get("operation"),
})
print(f" → Number: {num} (manual)")
return {"random_number": int(num)}
num = result["number"]
print(f" → Number: {num}")
return {"random_number": num}
编译图时必须附带 checkpointer(状态持久化),调用时必须指定 thread_id(标识具体会话):
# 带 checkpointer 编译
checkpointer = SqliteSaver(sqlite3.connect("math_engine.db"))
graph = builder.compile(checkpointer=checkpointer)
# 启动新线程
config = {"configurable": {"thread_id": "session-42"}}
# 首次调用 — 运行直到 interrupt() 被调用
result = graph.invoke(
build_initial_state(current_number=100, manual_input_chance=1.0),
config=config,
)
# 图现在已暂停。result["__interrupt__"] 包含提示:
# [Interrupt(value={"prompt": "Enter a number", "current_number": 100, ...})]
# ... 时间流逝,用户通过 Web UI、API 等提供输入 ...
# 使用用户的值恢复
result = graph.invoke(Command(resume=42), config=config)
# 图从 interrupt() 被调用的地方继续执行,
# num = 42,并运行直到下一个 interrupt 或 END。
几个要点。
interrupt()
和
input()
目的相同,区别在于前者走 HTTP 而非 stdin——传入一个 payload(提示、上下文等),调用方通过
Command(resume=...)
回传用户输入。checkpointer 负责持久化图状态,LangGraph 支持 SQLite、Postgres 等多种后端。thread_id 用来标识会话,多个用户可以各自持有独立的暂停/运行中的图实例。远程图和图的连接方式无需任何改动,变化只发生在节点函数和调用模式上。
保护线程:认证与授权
如果 thread ID 是保护暂停会话的唯一手段,任何猜中或截获了 thread ID 的人都能恢复别人的图、注入自己的值。LangGraph Platform 对此有内置的认证与授权层。
认证系统分两步走。
@auth.authenticate
处理程序作为中间件在每个请求上运行,验证调用者的凭据(JWT token、API 密钥、OAuth2 等)并返回用户身份;
@auth.on
处理程序执行资源级访问控制,给每个线程打上所有者标记,过滤访问权限,确保用户只能看到和恢复自己的线程。
线程级安全的实现如下:
from langgraph_sdk import Auth
auth = Auth()
@auth.authenticate
async def authenticate(authorization: str) -> Auth.types.MinimalUserDict:
"""验证 Bearer token 并返回用户信息。"""
token = authorization.split(" ", 1)[1]
user = await verify_jwt(token) # 你的 JWT 验证逻辑
return {
"identity": user["sub"],
"is_authenticated": True,
}
@auth.on.threads.create
async def on_thread_create(
ctx: Auth.types.AuthContext,
value: Auth.types.on.threads.create.value,
):
"""为每个新线程标记创建者的身份。
`value` 是线程创建载荷 — 一个包含来自 API 请求的字段的字典:
thread_id, metadata, if_exists 等。
我们修改 value["metadata"] 以在存储之前标记所有者。
返回值是 LangGraph 写入线程 metadata 的元数据过滤器。
"""
value.setdefault("metadata", {})["owner"] = ctx.user.identity
return {"owner": ctx.user.identity}
@auth.on.threads.read
async def on_thread_read(
ctx: Auth.types.AuthContext,
value: Auth.types.on.threads.read.value,
):
"""过滤线程,使用户只能看到自己的。
`value` 是读取请求载荷 — 一个包含 thread_id 和来自
API 请求的任何 metadata 的字典。
返回值不是检查 — 而是查询过滤器。
LangGraph 在存储层应用它:只有 metadata.owner 与
ctx.user.identity 匹配的线程才会被返回。
其他用户拥有的线程是不可见的,而不仅仅是被阻止。
"""
return {"owner": ctx.user.identity}
@auth.on.threads.create_run
async def on_thread_resume(
ctx: Auth.types.AuthContext,
value: Auth.types.on.threads.create_run.value,
):
"""过滤用户可以在哪些线程上恢复运行。
`value` 是运行创建载荷 — 一个包含 thread_id, assistant_id,
input, command, metadata, config 等的字典。
相同的过滤器机制:LangGraph 仅在线程的 metadata.owner
与返回的过滤器匹配时才允许运行。如果用户尝试恢复
另一个用户的线程,平台会拒绝请求,因为该线程
未通过过滤器。
"""
metadata = value.setdefault("metadata", {})
metadata["owner"] = ctx.user.identity
return {"owner": ctx.user.identity}
在
langgraph.json
中注册:
{
"auth": {
"path": "src/security/auth.py:auth"
}
}
配置完成后,即使攻击者拿到了其他用户的 thread ID 也无法读取线程状态或恢复运行——授权处理程序会因为 owner metadata 不匹配而拒绝请求。过滤发生在平台层,不在图代码中,无法通过构造 API 请求绕过。
生产部署时 LangGraph Platform 可以对接 Auth0、Supabase 以及任何 OAuth2/JWT 认证体系。记住一点:thread ID 是标识符,不是密钥——安全保障来自认证层对访问权限的把控。
前面基于控制台
input()
的版本已经是这一模式的可运行原型。迁移到
interrupt
只需改动节点函数和调用模式,架构其余部分——远程图、fan-out/fan-in、错误处理——全部保持原样。
RemoteGraph 的工作原理
langgraph.pregel.remote
中的
RemoteGraph
类是整个组合能力的底层支撑。它实现的接口和本地编译的图完全一致,可以
.invoke()
、
.stream()
,也可以直接嵌入为另一个图的子图节点。通信通过 HTTP对接 LangGraph Server API。
from langgraph.pregel.remote import RemoteGraph
remote = RemoteGraph(
"math_service", # 来自 langgraph.json 的 assistant ID
url="http://localhost:2024",
)
# 像使用普通图一样使用它 — action 字段控制行为
result = remote.invoke({"action": "pick_operation", "operation": "", "number": 0})
RemoteGraph
遵循
Runnable
接口,可以直接作为节点添加到另一个图中:
builder.add_node("my_remote_node", remote_graph)
编排器图不需要了解远程图的内部实现细节——它可以是一个简单状态机、一个 LLM 驱动的 agent,或者任何介于两者之间的东西。而通过条件路由,一个远程图部署就能承载多种操作。
第二个调用依赖于第一个调用的结果时选顺序执行,或者想在最后一轮
end
时省掉无用的远程调用也该选顺序。两个调用互相独立、想缩短总耗时就选并行——在生产环境中远程图调用可能涉及 LLM 推理或数据库查询,并行执行差不多能把每轮延迟砍掉一半。
错误处理
并行执行带来一个自然的问题:远程图调用失败了怎么办?
math_service
临时挂掉\网络请求超时,这些情况都需要考虑。LangGraph 的处理提供了多种应对策略。
默认行为:原子 superstep
LangGraph 中并行节点在一个 superstep 里共同执行。superstep 中任何一个节点抛出异常,整个 superstep 原子性失败,不会有部分状态写入。假如
get_random_number
成功、
get_operation
失败,两边的结果都不会写入状态,避免了有随机数却没运算符这种不一致。
配了 checkpointer 的情况下 LangGraph 会在内部保存成功节点的结果。恢复执行时只有失败分支重试,成功分支的工作不必重复。
策略 1:RetryPolicy(图原生重试)
最干净的做法是对容易出错的节点附加
RetryPolicy
。LangGraph 接管重试循环,支持配置尝试次数、退避策略和抖动。只有失败分支重试,成功的并行节点不会重新执行。
重试全部耗尽后异常向上传播,图调用失败。对网络超时、5xx 错误这类瞬态故障,这是恰当的处理方式。
策略 2:节点内 Try/Except(降级处理)
需要图在远程服务不可用时仍然继续运行的场景下,在节点内部捕获异常并返回降级值即可。操作调用失败则引擎停止循环;数字生成调用失败则用安全默认值代替。
策略 3:两者结合(生产环境推荐)
生产中通常既要重试也要降级。
RetryPolicy
透明处理瞬态故障,重试全部耗尽后异常才落到节点内部的
try/except
块,由它提供兜底逻辑。
以下是三种策略的完整可运行示例。
总结
RemoteGraph 让分布式 agent 架构的组合变得相当简洁——顺序还是并行的连接方式随意切换,
RetryPolicy
加上节点内降级逻辑构成两层容错。单个远程图通过条件路由就能承载多种操作,基础设施不必铺得很大,编排逻辑也能保持清晰。
这个数学引擎作为 demo 虽然简单,展示的模式却可以直接迁移到正式系统——微服务化的 AI 编排,每个图作为一个独立部署的 agent 逻辑单元,根据延迟和成本需求选择合适的执行策略。
本文完整代码:
https://github.com/sentipy/langgraph_random_math
by Alexander Machekhin