你的 LangGraph workflow 在第 15 步崩溃后重启,框架从 checkpoint 恢复,但 Agent 用的是重启前内存里的「已处理文件列表」,而 checkpoint 里记录的是另一个版本——两个来源的数据不一致,Agent 重新处理了已经完成的文件,又跳过了崩溃时正在处理的文件。Temporal 里也有类似情况:worker 崩溃重启后,workflow 状态从持久化存储恢复,但 activity 的本地缓存变量丢失了,导致 activity 用空值重跑。这类状态漂移问题的共同特征是:重启后「说自己记得」的状态和「实际发生过的」状态之间有缺口。
常见原因
1. 部分状态在内存里,部分在持久化存储里
最常见的根因。Agent 把「已处理的 ID 集合」存在 Python 的 set 对象里(重启后消失),同时把「总体进度」存在 Redis 里(重启后保留)。重启后,Redis 显示任务 70% 完成,但内存里的 ID 集合是空的,Agent 重新开始处理所有 ID。
怎么判断:列出所有状态变量,标记每个变量的存储位置(内存/文件/Redis/数据库)。如果有任何运行时需要的变量只在内存里,重启后就会丢失。
2. Checkpoint 写入时机不对(写后崩溃 or 崩溃前未写)
Checkpoint 在每 10 步写一次,但 Agent 在第 7 步崩溃。重启后恢复到第 0 步的 checkpoint,重新执行 7 步已经完成的工作,可能产生副作用(如重复发邮件、重复写数据库)。
怎么判断:查看最近一次 checkpoint 的时间戳,与崩溃时间对比,计算丢失的步骤数。如果步骤数大于 0,就是写入粒度太粗。
3. 外部资源的状态没有纳入 checkpoint
Agent 创建了一个临时文件 /tmp/work_state.json,重启后这个文件可能被系统清理,或者在另一台 worker 上根本不存在。Checkpoint 里只有「任务进行中」的标记,没有文件的内容。
怎么判断:检查 Agent 依赖的所有外部资源(文件、临时目录、数据库行、外部 API session),确认它们在 checkpoint 里有对应的记录或能被重建。
4. 状态版本不兼容
代码升级后,新版本的状态 schema 和旧版本的 checkpoint 数据格式不匹配。state["new_field"] 在旧 checkpoint 里不存在,Agent 拿到 None 或 KeyError,然后用错误的默认值继续执行。
怎么判断:比较当前代码里 WorkflowState 的 schema 与最新 checkpoint 的实际字段列表,检查是否有新增字段在旧 checkpoint 里缺失。
5. 并发写入导致 checkpoint 内容不一致
多个线程或进程同时更新同一个 checkpoint,写操作没有加锁,导致部分字段来自一个写入者、部分字段来自另一个写入者,checkpoint 处于不一致状态。
怎么判断:检查 checkpoint 文件或数据库记录的 updated_at 时间戳,是否存在同一毫秒内的多次写入。
6. 重启后没有做状态校验就直接继续执行
Agent 盲目信任 checkpoint 里的状态,不做任何有效性验证就继续执行。如果 checkpoint 本身是损坏或过期的(如来自不同环境的 checkpoint 被意外加载),Agent 会用错误的状态跑完整个 workflow。
怎么判断:检查恢复逻辑里是否有 validate_state() 调用。如果没有,就是盲目信任。
最短修复路径
Step 1:把所有运行时状态统一到单一持久化 store
from dataclasses import dataclass, field
from typing import Any
import json, pathlib
@dataclass
class AgentState:
"""所有状态字段,必须可序列化。禁止在这个类之外保存运行时状态。"""
task_id: str
processed_ids: list[str] = field(default_factory=list)
current_step: int = 0
total_steps: int = 0
last_checkpoint_at: str = ""
schema_version: int = 2 # 每次修改 schema 时递增
def save(self, path: str):
pathlib.Path(path).write_text(
json.dumps(self.__dict__, ensure_ascii=False, indent=2)
)
@classmethod
def load(cls, path: str) -> "AgentState":
data = json.loads(pathlib.Path(path).read_text())
# 版本迁移
if data.get("schema_version", 1) < 2:
data["processed_ids"] = data.pop("done_ids", [])
data["schema_version"] = 2
return cls(**data)
Step 2:每步执行后立即写 checkpoint(而不是每 N 步)
async def run_step(state: AgentState, step_fn, checkpoint_path: str):
"""执行一步并立即持久化,保证崩溃后最多丢失 1 步。"""
result = await step_fn(state)
state.current_step += 1
state.last_checkpoint_at = datetime.utcnow().isoformat()
state.save(checkpoint_path) # 立即写,不要批量
return result
Step 3:恢复时做完整性校验
def restore_and_validate(checkpoint_path: str) -> AgentState:
state = AgentState.load(checkpoint_path)
# 1. schema 版本检查
assert state.schema_version == CURRENT_SCHEMA_VERSION, \
f"checkpoint schema 版本 {state.schema_version} 与当前代码 {CURRENT_SCHEMA_VERSION} 不匹配"
# 2. 外部资源存在性检查
for item_id in state.processed_ids:
assert db.exists(item_id), f"checkpoint 记录 {item_id} 已处理,但数据库里找不到对应记录"
# 3. 步骤范围检查
assert 0 <= state.current_step <= state.total_steps, \
f"current_step={state.current_step} 超出合法范围 [0, {state.total_steps}]"
return state
Step 4:用事务性更新防止并发写入不一致
import filelock
def atomic_save(state: AgentState, path: str):
"""原子性写入:先写临时文件,再原子重命名,防止写到一半崩溃。"""
tmp_path = path + ".tmp"
lock = filelock.FileLock(path + ".lock")
with lock:
pathlib.Path(tmp_path).write_text(
json.dumps(state.__dict__, ensure_ascii=False)
)
pathlib.Path(tmp_path).rename(path) # 原子操作
Step 5:在 Temporal 里用 workflow 参数传递状态,不用 activity 局部变量
# 错误:把状态放在 activity 的局部变量里(重启后丢失)
# processed = set() # activity 局部变量
#
# 正确:状态通过 workflow 的返回值链传递
@workflow.defn
class ProcessWorkflow:
@workflow.run
async def run(self, input: ProcessInput) -> ProcessResult:
processed_ids = []
for item in input.items:
result = await workflow.execute_activity(
process_item,
ProcessItemInput(item_id=item.id, already_done=processed_ids),
schedule_to_close_timeout=timedelta(minutes=5)
)
processed_ids.append(item.id) # 状态在 workflow 层累积,Temporal 自动持久化
return ProcessResult(processed_ids=processed_ids)
预防建议
- 禁止在 Agent 代码里使用模块级全局变量保存运行时状态,所有状态必须通过
AgentState对象持久化 - 每次修改
AgentStateschema 时递增版本号,并在load()里实现向后兼容的迁移逻辑 - 用「每步写 checkpoint」替代「每 N 步写 checkpoint」,N 越大丢失的工作越多
- 为所有 checkpoint 写原子性操作(tmp + rename),防止写到一半崩溃导致损坏
- 在恢复逻辑里加业务层校验(不只是 schema 校验),确认 checkpoint 描述的历史与外部系统的实际状态一致
- 在 staging 环境定期做「混沌测试」:随机 kill Agent 进程,验证恢复后的行为是否正确
- Temporal/Inngest 场景:所有跨 activity 的状态通过 workflow 返回值链传递,不依赖 worker 本地内存
常见问答 (FAQ)
Q: LangGraph 的 MemorySaver 和 SqliteSaver 在重启场景下有什么区别?
A: MemorySaver 把 checkpoint 存在进程内存里,进程重启后全部丢失,只适用于测试。SqliteSaver 把 checkpoint 写入 SQLite 文件,重启后可以恢复。生产环境应使用 PostgresSaver 或 RedisSaver,支持多 worker 共享 checkpoint。
Q: 如果 checkpoint 和外部数据库状态不一致,应该以哪个为准?
A: 以外部数据库为准。数据库是事实来源(source of truth),checkpoint 只是执行进度的记录。恢复时,先从数据库查询哪些 item 已经被处理(通过唯一 ID + 处理状态字段),重建 processed_ids 列表,而不是直接信任 checkpoint 里的 processed_ids。
Q: schema_version 迁移代码需要保留多久? A: 需要保留到所有生产环境的 checkpoint 都已经升级到新版本。实践上,保留 2 个大版本的迁移代码是常见做法。如果运维上能确保重启前先清理旧 checkpoint,迁移代码可以更早删除。
Q: 如何检测生产环境里是否存在状态漂移? A: 在每次 workflow 完成时,运行一个对账任务:对比 checkpoint 记录的「已处理 ID 数量」与数据库里实际的「处理完成记录数量」。如果两个数字不一致,就存在状态漂移,需要人工介入调查。